This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1bfbdc9  Revert "[FLINK-26986][python] Remove deprecated string 
expressions in Python Table API"
1bfbdc9 is described below

commit 1bfbdc9c76de3b3fab4d70058257b7a307269570
Author: Dian Fu <[email protected]>
AuthorDate: Sat Apr 2 09:58:17 2022 +0800

    Revert "[FLINK-26986][python] Remove deprecated string expressions in 
Python Table API"
    
    This reverts commits 02c5e4136c809eac7b5d723be0d043b639ddf477.
---
 .../python/table/python_table_api_connectors.md    |   2 +-
 .../docs/dev/python/table/udfs/python_udfs.md      |  12 +-
 .../python/table/udfs/vectorized_python_udfs.md    |   3 +-
 docs/content.zh/docs/dev/table/catalogs.md         |   2 +-
 docs/content.zh/docs/dev/table/tableApi.md         |  15 +-
 .../python/table/python_table_api_connectors.md    |   2 +-
 .../docs/dev/python/table/udfs/python_udfs.md      |  12 +-
 .../python/table/udfs/vectorized_python_udfs.md    |   3 +-
 docs/content/docs/dev/table/catalogs.md            |   2 +-
 docs/content/docs/dev/table/tableApi.md            |  15 +-
 .../pyflink/examples/table/pandas/pandas_udaf.py   |   2 +-
 .../examples/table/windowing/over_window.py        |   6 +-
 .../examples/table/windowing/session_window.py     |   2 +-
 .../examples/table/windowing/sliding_window.py     |   2 +-
 .../examples/table/windowing/tumble_window.py      |   2 +-
 flink-python/pyflink/table/table.py                | 214 +++++++++++++++------
 flink-python/pyflink/table/window.py               |   7 +
 17 files changed, 204 insertions(+), 99 deletions(-)

diff --git 
a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md 
b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
index 6be2c1e..dbf94fb 100644
--- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
@@ -139,7 +139,7 @@ import numpy as np
 
 # 创建一个 PyFlink 表
 pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
+table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
 
 # 将 PyFlink 表转换成 Pandas DataFrame
 pdf = table.to_pandas()
diff --git a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md 
b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
index f16c433..124d1bb 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
@@ -128,7 +128,7 @@ add = udf(functools.partial(partial_add, k=1), 
result_type=DataTypes.BIGINT())
 # 注册 Python 自定义函数
 table_env.create_temporary_function("add", add)
 # 在 Python Table API 中使用 Python 自定义函数
-my_table.select(call('add', my_table.a, my_table.b))
+my_table.select("add(a, b)")
 
 # 也可以在 Python Table API 中直接使用 Python 自定义函数
 my_table.select(add(my_table.a, my_table.b))
@@ -156,8 +156,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
 
 # 在 Python Table API 中使用 Python 表值函数
-my_table.join_lateral(split(my_table.a).alias("word", "length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word", "length"))
+my_table.join_lateral(split(my_table.a).alias("word, length"))
+my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
 
 # 在 SQL API 中使用 Python 表值函数
 table_env.create_temporary_function("split", udtf(Split(), 
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -194,8 +194,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 table_env.create_java_temporary_function("split", "my.java.function.Split")
 
 # 在 Python Table API 中使用表值函数。 "alias"指定表的字段名称。
-my_table.join_lateral(call('split', my_table.a).alias("word", 
"length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word", 
"length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
 
 # 注册 Python 函数。
 
@@ -337,7 +337,7 @@ tumble_window = Tumble.over(lit(1).hours) \
 
 result = t.window(tumble_window) \
         .group_by(col('w'), col('name')) \
-        .select(col('w').start, col('w').end, weighted_avg(col('value'), 
col('count'))) \
+        .select("w.start, w.end, weighted_avg(value, count)") \
         .to_pandas()
 print(result)
 
diff --git 
a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md 
b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
index 10012d8..af6c7f6 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -101,7 +101,8 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
 
 my_table.window(tumble_window) \
     .group_by("w") \
-    .select(col('w').start, col('w').end, mean_udaf(col('b')))
+    .select("w.start, w.end, mean_udaf(b)")
+
 
 # 在 Over Window Aggregation 中使用向量化聚合函数
 table_env.create_temporary_function("mean_udaf", mean_udaf)
diff --git a/docs/content.zh/docs/dev/table/catalogs.md 
b/docs/content.zh/docs/dev/table/catalogs.md
index 05722d8..0b86fbd 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -241,7 +241,7 @@ schema = Schema.new_builder() \
     
 catalog_table = t_env.create_table("myhive.mydb.mytable", 
TableDescriptor.for_connector("kafka")
     .schema(schema)
-    # …
+    // …
     .build())
 
 # tables should contain "mytable"
diff --git a/docs/content.zh/docs/dev/table/tableApi.md 
b/docs/content.zh/docs/dev/table/tableApi.md
index d2b198c..65a9f9d 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -417,7 +417,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y", 
"z", "t")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.alias("x", "y", "z", "t")
+result = orders.alias("x, y, z, t")
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1054,7 +1054,7 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s", "t", "v"))
+joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
 result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
 ```
 {{< /tab >}}
@@ -1103,7 +1103,7 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s", "t", 
"v"))
+joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
 result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
 ```
 {{< /tab >}}
@@ -2453,7 +2453,7 @@ agg = udaf(function,
 # 使用 python 通用聚合函数进行聚合
 result = t.group_by(t.a) \
     .aggregate(agg.alias("c", "d")) \
-    .select(col('a'), col('c'), col('d'))
+    .select("a, c, d")
     
 # 使用 python 向量化聚合函数进行聚合
 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2462,7 +2462,8 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
 t.aggregate(pandas_udaf.alias("a", "b")) \
-    .select(col('a'), col('b'))
+    .select("a, b")
+
 ```
 
 {{< /tab >}}
@@ -2514,9 +2515,9 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
     .alias("w")
 t.select(t.b, t.rowtime) \
     .window(tumble_window) \
-    .group_by(col("w")) \
+    .group_by("w") \
     .aggregate(pandas_udaf.alias("d", "e")) \
-    .select(col('w').rowtime, col('d'), col('e'))
+    .select("w.rowtime, d, e")
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/dev/python/table/python_table_api_connectors.md 
b/docs/content/docs/dev/python/table/python_table_api_connectors.md
index b3ee248..a7f31d3 100644
--- a/docs/content/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content/docs/dev/python/table/python_table_api_connectors.md
@@ -143,7 +143,7 @@ import numpy as np
 
 # Create a PyFlink Table
 pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
+table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
 
 # Convert the PyFlink Table to a Pandas DataFrame
 pdf = table.to_pandas()
diff --git a/docs/content/docs/dev/python/table/udfs/python_udfs.md 
b/docs/content/docs/dev/python/table/udfs/python_udfs.md
index c33a51b..745124b 100644
--- a/docs/content/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/python_udfs.md
@@ -129,7 +129,7 @@ add = udf(functools.partial(partial_add, k=1), 
result_type=DataTypes.BIGINT())
 # register the Python function
 table_env.create_temporary_function("add", add)
 # use the function in Python Table API
-my_table.select(call('add', my_table.a, my_table.b))
+my_table.select("add(a, b)")
 
 # You can also use the Python function in Python Table API directly
 my_table.select(add(my_table.a, my_table.b))
@@ -158,8 +158,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
 
 # use the Python Table Function in Python Table API
-my_table.join_lateral(split(my_table.a).alias("word", "length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word", "length"))
+my_table.join_lateral(split(my_table.a).alias("word, length"))
+my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
 
 # use the Python Table function in SQL API
 table_env.create_temporary_function("split", udtf(Split(), 
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -196,8 +196,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 table_env.create_java_temporary_function("split", "my.java.function.Split")
 
 # Use the table function in the Python Table API. "alias" specifies the field 
names of the table.
-my_table.join_lateral(call('split', my_table.a).alias("word", 
"length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word", 
"length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
 
 # Register the python function.
 
@@ -338,7 +338,7 @@ tumble_window = Tumble.over(lit(1).hours) \
 
 result = t.window(tumble_window) \
         .group_by(col('w'), col('name')) \
-        .select(col('w').start, col('w').end, weighted_avg(col('value'), 
col('count'))) \
+        .select("w.start, w.end, weighted_avg(value, count)") \
         .to_pandas()
 print(result)
 
diff --git a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md 
b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
index c3461b3..7b79eab 100644
--- a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -100,7 +100,8 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
 
 my_table.window(tumble_window) \
     .group_by("w") \
-    .select(col('w').start, col('w').end, mean_udaf(col('b')))
+    .select("w.start, w.end, mean_udaf(b)")
+
 
 # use the vectorized Python aggregate function in Over Window Aggregation
 table_env.create_temporary_function("mean_udaf", mean_udaf)
diff --git a/docs/content/docs/dev/table/catalogs.md 
b/docs/content/docs/dev/table/catalogs.md
index f166ab5..d0b7ab6 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -245,7 +245,7 @@ schema = Schema.new_builder() \
     
 catalog_table = t_env.create_table("myhive.mydb.mytable", 
TableDescriptor.for_connector("kafka")
     .schema(schema)
-    # …
+    // …
     .build())
 
 # tables should contain "mytable"
diff --git a/docs/content/docs/dev/table/tableApi.md 
b/docs/content/docs/dev/table/tableApi.md
index 91ab6cf..d4f132c 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -418,7 +418,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y", 
"z", "t")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.alias("x", "y", "z", "t")
+result = orders.alias("x, y, z, t")
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1053,7 +1053,7 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s", "t", "v"))
+joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
 result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
 ```
 {{< /tab >}}
@@ -1102,7 +1102,7 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s", "t", 
"v"))
+joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
 result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
 ```
 {{< /tab >}}
@@ -2452,7 +2452,7 @@ agg = udaf(function,
 # aggregate with a python general aggregate function
 result = t.group_by(t.a) \
     .aggregate(agg.alias("c", "d")) \
-    select(col('a'), col('c'), col('d'))
+    .select("a, c, d")
     
 # aggregate with a python vectorized aggregate function
 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2461,7 +2461,8 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
 t.aggregate(pandas_udaf.alias("a", "b")) \
-    select(col('a'), col('b'))
+    .select("a, b")
+
 ```
 
 {{< /tab >}}
@@ -2514,9 +2515,9 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
     .alias("w")
 t.select(t.b, t.rowtime) \
     .window(tumble_window) \
-    .group_by(col("w")) \
+    .group_by("w") \
     .aggregate(pandas_udaf.alias("d", "e")) \
-    .select(col('w').rowtime, col('d'), col('e'))
+    .select("w.rowtime, d, e")
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py 
b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
index 4b884dd..e4e8b9e 100644
--- a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
+++ b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
@@ -55,7 +55,7 @@ def pandas_udaf():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts", "name", "price")
+    ).alias("ts, name, price")
 
     # define the sink
     t_env.create_temporary_table(
diff --git a/flink-python/pyflink/examples/table/windowing/over_window.py 
b/flink-python/pyflink/examples/table/windowing/over_window.py
index 5fd736b..982d6b8 100644
--- a/flink-python/pyflink/examples/table/windowing/over_window.py
+++ b/flink-python/pyflink/examples/table/windowing/over_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts", "name", "price")
+    ).alias("ts, name, price")
 
     # define the sink
     t_env.create_temporary_table(
@@ -68,8 +68,8 @@ def tumble_window_demo():
 
     # define the over window operation
     table = table.over_window(
-        Over.partition_by(col("name"))
-            .order_by(col("ts"))
+        Over.partition_by("name")
+            .order_by("ts")
             .preceding(row_interval(2))
             .following(CURRENT_ROW)
             .alias('w')) \
diff --git a/flink-python/pyflink/examples/table/windowing/session_window.py 
b/flink-python/pyflink/examples/table/windowing/session_window.py
index 49e4680..5b40a7b 100644
--- a/flink-python/pyflink/examples/table/windowing/session_window.py
+++ b/flink-python/pyflink/examples/table/windowing/session_window.py
@@ -52,7 +52,7 @@ def session_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts", "name", "price")
+    ).alias("ts, name, price")
 
     # define the sink
     t_env.create_temporary_table(
diff --git a/flink-python/pyflink/examples/table/windowing/sliding_window.py 
b/flink-python/pyflink/examples/table/windowing/sliding_window.py
index fc460c6..1b8bb15 100644
--- a/flink-python/pyflink/examples/table/windowing/sliding_window.py
+++ b/flink-python/pyflink/examples/table/windowing/sliding_window.py
@@ -54,7 +54,7 @@ def sliding_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts", "name", "price")
+    ).alias("ts, name, price")
 
     # define the sink
     t_env.create_temporary_table(
diff --git a/flink-python/pyflink/examples/table/windowing/tumble_window.py 
b/flink-python/pyflink/examples/table/windowing/tumble_window.py
index c778747..dd3ba2e 100644
--- a/flink-python/pyflink/examples/table/windowing/tumble_window.py
+++ b/flink-python/pyflink/examples/table/windowing/tumble_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts", "name", "price")
+    ).alias("ts, name, price")
 
     # define the sink
     t_env.create_temporary_table(
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index ed46d8a..b7d8ae6 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -90,7 +90,7 @@ class Table(object):
                 % (name, ', '.join(self.get_schema().get_field_names())))
         return col(name)
 
-    def select(self, *fields: Expression) -> 'Table':
+    def select(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Performs a selection operation. Similar to a SQL SELECT statement. The 
field expressions
         can contain complex expressions.
@@ -102,9 +102,16 @@ class Table(object):
             >>> tab.select(tab.key, expr.concat(tab.value, 'hello'))
             >>> tab.select(expr.col('key'), expr.concat(expr.col('value'), 
'hello'))
 
+            >>> tab.select("key, value + 'hello'")
+
         :return: The result table.
         """
-        return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
     def alias(self, field: str, *fields: str) -> 'Table':
         """
@@ -115,6 +122,7 @@ class Table(object):
         ::
 
             >>> tab.alias("a", "b", "c")
+            >>> tab.alias("a, b, c")
 
         :param field: Field alias.
         :param fields: Additional field aliases.
@@ -133,6 +141,7 @@ class Table(object):
         ::
 
             >>> tab.filter(tab.name == 'Fred')
+            >>> tab.filter("name = 'Fred'")
 
         :param predicate: Predicate expression string.
         :return: The result table.
@@ -148,13 +157,14 @@ class Table(object):
         ::
 
             >>> tab.where(tab.name == 'Fred')
+            >>> tab.where("name = 'Fred'")
 
         :param predicate: Predicate expression string.
         :return: The result table.
         """
         return Table(self._j_table.where(_get_java_expression(predicate)), 
self._t_env)
 
-    def group_by(self, *fields: Expression) -> 'GroupedTable':
+    def group_by(self, *fields: Union[str, Expression]) -> 'GroupedTable':
         """
         Groups the elements on some grouping keys. Use this before a selection 
with aggregations
         to perform the aggregation on a per-group basis. Similar to a SQL 
GROUP BY statement.
@@ -163,11 +173,17 @@ class Table(object):
         ::
 
             >>> tab.group_by(tab.key).select(tab.key, tab.value.avg)
+            >>> tab.group_by("key").select("key, value.avg")
 
         :param fields: Group keys.
         :return: The grouped table.
         """
-        return 
GroupedTable(self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return 
GroupedTable(self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return GroupedTable(self._j_table.groupBy(fields[0]), self._t_env)
 
     def distinct(self) -> 'Table':
         """
@@ -212,7 +228,7 @@ class Table(object):
 
     def left_outer_join(self,
                         right: 'Table',
-                        join_predicate: Expression[bool] = None) -> 'Table':
+                        join_predicate: Union[str, Expression[bool]] = None) 
-> 'Table':
         """
         Joins two :class:`~pyflink.table.Table`. Similar to a SQL left outer 
join. The fields of
         the two joined operations must not overlap, use 
:func:`~pyflink.table.Table.alias` to
@@ -228,6 +244,7 @@ class Table(object):
 
             >>> left.left_outer_join(right)
             >>> left.left_outer_join(right, left.a == right.b)
+            >>> left.left_outer_join(right, "a = b")
 
         :param right: Right table.
         :param join_predicate: Optional, the join predicate expression string.
@@ -256,6 +273,7 @@ class Table(object):
         ::
 
             >>> left.right_outer_join(right, left.a == right.b)
+            >>> left.right_outer_join(right, "a = b")
 
         :param right: Right table.
         :param join_predicate: The join predicate expression string.
@@ -281,6 +299,7 @@ class Table(object):
         ::
 
             >>> left.full_outer_join(right, left.a == right.b)
+            >>> left.full_outer_join(right, "a = b")
 
         :param right: Right table.
         :param join_predicate: The join predicate expression string.
@@ -301,7 +320,8 @@ class Table(object):
         ::
 
             >>> t_env.create_java_temporary_system_function("split",
-            ...     "java.table.function.class.name")
+           ...     "java.table.function.class.name")
+            >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
 
             >>> from pyflink.table import expressions as expr
             >>> tab.join_lateral(expr.call('split', ' ').alias('b'), 
expr.col('a') == expr.col('b'))
@@ -350,6 +370,7 @@ class Table(object):
 
             >>> t_env.create_java_temporary_system_function("split",
             ...     "java.table.function.class.name")
+            >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
             >>> from pyflink.table import expressions as expr
             >>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
             >>> # take all the columns as inputs
@@ -507,7 +528,7 @@ class Table(object):
         """
         return Table(self._j_table.intersectAll(right._j_table), self._t_env)
 
-    def order_by(self, *fields: Expression) -> 'Table':
+    def order_by(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Sorts the given :class:`~pyflink.table.Table`. Similar to SQL ORDER BY.
         The resulting Table is sorted globally sorted across all parallel 
partitions.
@@ -516,6 +537,7 @@ class Table(object):
         ::
 
             >>> tab.order_by(tab.name.desc)
+            >>> tab.order_by("name.desc")
 
         For unbounded tables, this operation requires a sorting on a time 
attribute or a subsequent
         fetch operation.
@@ -523,7 +545,12 @@ class Table(object):
         :param fields: Order fields expression string.
         :return: The result table.
         """
-        return Table(self._j_table.orderBy(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.orderBy(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.orderBy(fields[0]), self._t_env)
 
     def offset(self, offset: int) -> 'Table':
         """
@@ -538,6 +565,7 @@ class Table(object):
 
             # skips the first 3 rows and returns all following rows.
             >>> tab.order_by(tab.name.desc).offset(3)
+            >>> tab.order_by("name.desc").offset(3)
             # skips the first 10 rows and returns the next 5 rows.
             >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
 
@@ -562,6 +590,7 @@ class Table(object):
         ::
 
             >>> tab.order_by(tab.name.desc).fetch(3)
+            >>> tab.order_by("name.desc").fetch(3)
 
         Skips the first 10 rows and returns the next 5 rows.
         ::
@@ -670,7 +699,7 @@ class Table(object):
                                  [item._java_over_window for item in 
over_windows])
         return OverWindowedTable(self._j_table.window(window_array), 
self._t_env)
 
-    def add_columns(self, *fields: Expression) -> 'Table':
+    def add_columns(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Adds additional columns. Similar to a SQL SELECT statement. The field 
expressions
         can contain complex expressions, but can not contain aggregations. It 
will throw an
@@ -681,13 +710,19 @@ class Table(object):
 
             >>> from pyflink.table import expressions as expr
             >>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b, 
'sunny').alias('b1'))
+            >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
 
         :param fields: Column list string.
         :return: The result table.
         """
-        return Table(self._j_table.addColumns(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return 
Table(self._j_table.addColumns(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.addColumns(fields[0]), self._t_env)
 
-    def add_or_replace_columns(self, *fields: Expression) -> 'Table':
+    def add_or_replace_columns(self, *fields: Union[str, Expression]) -> 
'Table':
         """
         Adds additional columns. Similar to a SQL SELECT statement. The field 
expressions
         can contain complex expressions, but can not contain aggregations. 
Existing fields will be
@@ -700,14 +735,20 @@ class Table(object):
             >>> from pyflink.table import expressions as expr
             >>> tab.add_or_replace_columns((tab.a + 1).alias('a1'),
             ...                            expr.concat(tab.b, 
'sunny').alias('b1'))
+            >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as 
b1")
 
         :param fields: Column list string.
         :return: The result table.
         """
-        return 
Table(self._j_table.addOrReplaceColumns(to_expression_jarray(fields)),
-                     self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return 
Table(self._j_table.addOrReplaceColumns(to_expression_jarray(fields)),
+                         self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.addOrReplaceColumns(fields[0]), 
self._t_env)
 
-    def rename_columns(self, *fields: Expression) -> 'Table':
+    def rename_columns(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Renames existing columns. Similar to a field alias statement. The 
field expressions
         should be alias expressions, and only the existing fields can be 
renamed.
@@ -716,14 +757,20 @@ class Table(object):
         ::
 
             >>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1'))
+            >>> tab.rename_columns("a as a1, b as b1")
 
         :param fields: Column list string.
         :return: The result table.
         """
-        return Table(self._j_table.renameColumns(to_expression_jarray(fields)),
-                     self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return 
Table(self._j_table.renameColumns(to_expression_jarray(fields)),
+                         self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.renameColumns(fields[0]), self._t_env)
 
-    def drop_columns(self, *fields: Expression) -> 'Table':
+    def drop_columns(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Drops existing columns. The field expressions should be field 
reference expressions.
 
@@ -731,14 +778,20 @@ class Table(object):
         ::
 
             >>> tab.drop_columns(tab.a, tab.b)
+            >>> tab.drop_columns("a, b")
 
         :param fields: Column list string.
         :return: The result table.
         """
-        return Table(self._j_table.dropColumns(to_expression_jarray(fields)),
-                     self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return 
Table(self._j_table.dropColumns(to_expression_jarray(fields)),
+                         self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.dropColumns(fields[0]), self._t_env)
 
-    def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) 
-> 'Table':
+    def map(self, func: Union[str, Expression, 
UserDefinedScalarFunctionWrapper]) -> 'Table':
         """
         Performs a map operation with a user-defined scalar function.
 
@@ -758,13 +811,15 @@ class Table(object):
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return Table(self._j_table.map(func), self._t_env)
+        elif isinstance(func, Expression):
             return Table(self._j_table.map(func._j_expr), self._t_env)
         else:
             func._set_takes_row_as_input()
             return 
Table(self._j_table.map(func(with_columns(col("*")))._j_expr), self._t_env)
 
-    def flat_map(self, func: Union[Expression, 
UserDefinedTableFunctionWrapper]) -> 'Table':
+    def flat_map(self, func: Union[str, Expression, 
UserDefinedTableFunctionWrapper]) -> 'Table':
         """
         Performs a flatMap operation with a user-defined table function.
 
@@ -788,13 +843,15 @@ class Table(object):
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return Table(self._j_table.flatMap(func), self._t_env)
+        elif isinstance(func, Expression):
             return Table(self._j_table.flatMap(func._j_expr), self._t_env)
         else:
             func._set_takes_row_as_input()
             return 
Table(self._j_table.flatMap(func(with_columns(col("*")))._j_expr), self._t_env)
 
-    def aggregate(self, func: Union[Expression, 
UserDefinedAggregateFunctionWrapper]) \
+    def aggregate(self, func: Union[str, Expression, 
UserDefinedAggregateFunctionWrapper]) \
             -> 'AggregatedTable':
         """
         Performs a global aggregate operation with an aggregate function. You 
have to close the
@@ -808,7 +865,7 @@ class Table(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.aggregate(agg(tab.a).alias("a", "b")).select(col('a'), 
col('b'))
+            >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.a.max()),
@@ -816,14 +873,16 @@ class Table(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.aggregate(agg.alias("a, b")).select(col('a'), col('b'))
+            >>> tab.aggregate(agg.alias("a, b")).select("a, b")
 
         :param func: user-defined aggregate function.
         :return: The result table.
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
             return AggregatedTable(self._j_table.aggregate(func._j_expr), 
self._t_env)
         else:
             func._set_takes_row_as_input()
@@ -834,7 +893,7 @@ class Table(object):
                 func = func(with_columns(col("*")))
             return AggregatedTable(self._j_table.aggregate(func._j_expr), 
self._t_env)
 
-    def flat_aggregate(self, func: Union[Expression, 
UserDefinedAggregateFunctionWrapper]) \
+    def flat_aggregate(self, func: Union[str, Expression, 
UserDefinedAggregateFunctionWrapper]) \
             -> 'FlatAggregateTable':
         """
         Perform a global flat_aggregate without group_by. flat_aggregate takes 
a
@@ -845,7 +904,7 @@ class Table(object):
         ::
 
             >>> table_agg = udtaf(MyTableAggregateFunction())
-            >>> tab.flat_aggregate(table_agg(tab.a).alias("a", 
"b")).select(col('a'), col('b'))
+            >>> tab.flat_aggregate(table_agg(tab.a).alias("a", 
"b")).select("a, b")
             >>> # take all the columns as inputs
             >>> class Top2(TableAggregateFunction):
             ...     def emit_value(self, accumulator):
@@ -871,14 +930,16 @@ class Table(object):
             ...         return DataTypes.ROW(
             ...             [DataTypes.FIELD("a", DataTypes.BIGINT())])
             >>> top2 = udtaf(Top2())
-            >>> tab.flat_aggregate(top2.alias("a", "b")).select(col('a'), 
col('b'))
+            >>> tab.flat_aggregate(top2.alias("a", "b")).select("a, b")
 
         :param func: user-defined table aggregate function.
         :return: The result table.
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return FlatAggregateTable(self._j_table.flatAggregate(func), 
self._t_env)
+        elif isinstance(func, Expression):
             return 
FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
             func._set_takes_row_as_input()
@@ -1063,7 +1124,7 @@ class GroupedTable(object):
         self._j_table = java_table
         self._t_env = t_env
 
-    def select(self, *fields: Expression) -> 'Table':
+    def select(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
         The field expressions can contain complex expressions and aggregations.
@@ -1072,13 +1133,20 @@ class GroupedTable(object):
         ::
 
             >>> tab.group_by(tab.key).select(tab.key, 
tab.value.avg.alias('average'))
+            >>> tab.group_by("key").select("key, value.avg as average")
+
 
         :param fields: Expression string that contains group keys and 
aggregate function calls.
         :return: The result table.
         """
-        return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
-    def aggregate(self, func: Union[Expression, 
UserDefinedAggregateFunctionWrapper]) \
+    def aggregate(self, func: Union[str, Expression, 
UserDefinedAggregateFunctionWrapper]) \
             -> 'AggregatedTable':
         """
         Performs a aggregate operation with an aggregate function. You have to 
close the
@@ -1092,8 +1160,7 @@ class GroupedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c", 
"d")).select(
-            ...     col('a'), col('c'), col('d'))
+            >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c", 
"d")).select("a, c, d")
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1101,14 +1168,16 @@ class GroupedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.group_by(tab.a).aggregate(agg.alias("a, 
b")).select(col('a'), col('b'))
+            >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
 
         :param func: user-defined aggregate function.
         :return: The result table.
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
             return AggregatedTable(self._j_table.aggregate(func._j_expr), 
self._t_env)
         else:
             func._set_takes_row_as_input()
@@ -1119,7 +1188,7 @@ class GroupedTable(object):
                 func = func(with_columns(col("*")))
             return AggregatedTable(self._j_table.aggregate(func._j_expr), 
self._t_env)
 
-    def flat_aggregate(self, func: Union[Expression, 
UserDefinedAggregateFunctionWrapper]) \
+    def flat_aggregate(self, func: Union[str, Expression, 
UserDefinedAggregateFunctionWrapper]) \
             -> 'FlatAggregateTable':
         """
         Performs a flat_aggregate operation on a grouped table. flat_aggregate 
takes a
@@ -1130,8 +1199,7 @@ class GroupedTable(object):
         ::
 
             >>> table_agg = udtaf(MyTableAggregateFunction())
-            >>> 
tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select(
-            ...     col('c'), col('a'))
+            >>> 
tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select("c, a")
             >>> # take all the columns as inputs
             >>> class Top2(TableAggregateFunction):
             ...     def emit_value(self, accumulator):
@@ -1157,14 +1225,16 @@ class GroupedTable(object):
             ...         return DataTypes.ROW(
             ...             [DataTypes.FIELD("a", DataTypes.BIGINT())])
             >>> top2 = udtaf(Top2())
-            >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", 
"b")).select(col('a'), col('b'))
+            >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", 
"b")).select("a, b")
 
         :param func: user-defined table aggregate function.
         :return: The result table.
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return FlatAggregateTable(self._j_table.flatAggregate(func), 
self._t_env)
+        elif isinstance(func, Expression):
             return 
FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
             func._set_takes_row_as_input()
@@ -1185,7 +1255,7 @@ class GroupWindowedTable(object):
         self._j_table = java_group_windowed_table
         self._t_env = t_env
 
-    def group_by(self, *fields: Expression) -> 'WindowGroupedTable':
+    def group_by(self, *fields: Union[str, Expression]) -> 
'WindowGroupedTable':
         """
         Groups the elements by a mandatory window and one or more optional 
grouping attributes.
         The window is specified by referring to its alias.
@@ -1211,8 +1281,13 @@ class GroupWindowedTable(object):
         :param fields: Group keys.
         :return: A window grouped table.
         """
-        return WindowGroupedTable(
-            self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return WindowGroupedTable(
+                self._j_table.groupBy(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return WindowGroupedTable(self._j_table.groupBy(fields[0]), 
self._t_env)
 
 
 class WindowGroupedTable(object):
@@ -1224,7 +1299,7 @@ class WindowGroupedTable(object):
         self._j_table = java_window_grouped_table
         self._t_env = t_env
 
-    def select(self, *fields: Expression) -> 'Table':
+    def select(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Performs a selection operation on a window grouped table. Similar to 
an SQL SELECT
         statement.
@@ -1236,6 +1311,7 @@ class WindowGroupedTable(object):
             >>> window_grouped_table.select(col('key'),
             ...                             col('window').start,
             ...                             col('value').avg.alias('valavg'))
+            >>> window_grouped_table.select("key, window.start, value.avg as 
valavg")
 
         :param fields: Expression string.
         :return: The result table.
@@ -1247,7 +1323,7 @@ class WindowGroupedTable(object):
             assert isinstance(fields[0], str)
             return Table(self._j_table.select(fields[0]), self._t_env)
 
-    def aggregate(self, func: Union[Expression, 
UserDefinedAggregateFunctionWrapper]) \
+    def aggregate(self, func: Union[str, Expression, 
UserDefinedAggregateFunctionWrapper]) \
             -> 'AggregatedTable':
         """
         Performs an aggregate operation on a window grouped table. You have to 
close the
@@ -1264,7 +1340,7 @@ class WindowGroupedTable(object):
             >>> window_grouped_table.group_by("w") \
             ...     .aggregate(agg(window_grouped_table.b) \
             ...     .alias("c", "d")) \
-            ...     .select(col('c'), col('d'))
+            ...     .select("c, d")
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1279,7 +1355,9 @@ class WindowGroupedTable(object):
 
         .. versionadded:: 1.13.0
         """
-        if isinstance(func, Expression):
+        if isinstance(func, str):
+            return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
             return AggregatedTable(self._j_table.aggregate(func._j_expr), 
self._t_env)
         else:
             func._set_takes_row_as_input()
@@ -1313,7 +1391,7 @@ class OverWindowedTable(object):
         self._j_table = java_over_windowed_table
         self._t_env = t_env
 
-    def select(self, *fields: Expression) -> 'Table':
+    def select(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Performs a selection operation on a over windowed table. Similar to an 
SQL SELECT
         statement.
@@ -1325,11 +1403,17 @@ class OverWindowedTable(object):
             >>> over_windowed_table.select(col('c'),
             ...                            col('b').count.over(col('ow')),
             ...                            col('e').sum.over(col('ow')))
+            >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
 
         :param fields: Expression string.
         :return: The result table.
         """
-        return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
 
 class AggregatedTable(object):
@@ -1341,7 +1425,7 @@ class AggregatedTable(object):
         self._j_table = java_table
         self._t_env = t_env
 
-    def select(self, *fields: Expression) -> 'Table':
+    def select(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Performs a selection operation after an aggregate operation. The field 
expressions
         cannot contain table functions and aggregations.
@@ -1354,7 +1438,7 @@ class AggregatedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.aggregate(agg(tab.a).alias("a", "b")).select(col('a'), 
col('b'))
+            >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1362,12 +1446,17 @@ class AggregatedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.group_by(tab.a).aggregate(agg.alias("a, 
b")).select(col('a'), col('b'))
+            >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
 
         :param fields: Expression string.
         :return: The result table.
         """
-        return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
 
 class FlatAggregateTable(object):
@@ -1380,7 +1469,7 @@ class FlatAggregateTable(object):
         self._j_table = java_table
         self._t_env = t_env
 
-    def select(self, *fields: Expression) -> 'Table':
+    def select(self, *fields: Union[str, Expression]) -> 'Table':
         """
         Performs a selection operation on a FlatAggregateTable. Similar to a 
SQL SELECT statement.
         The field expressions can contain complex expressions.
@@ -1389,7 +1478,7 @@ class FlatAggregateTable(object):
         ::
 
             >>> table_agg = udtaf(MyTableAggregateFunction())
-            >>> tab.flat_aggregate(table_agg(tab.a).alias("a", 
"b")).select(col('a'), col('b'))
+            >>> tab.flat_aggregate(table_agg(tab.a).alias("a", 
"b")).select("a, b")
             >>> # take all the columns as inputs
             >>> class Top2(TableAggregateFunction):
             ...     def emit_value(self, accumulator):
@@ -1415,9 +1504,14 @@ class FlatAggregateTable(object):
             ...         return DataTypes.ROW(
             ...             [DataTypes.FIELD("a", DataTypes.BIGINT())])
             >>> top2 = udtaf(Top2())
-            >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", 
"b")).select(col('a'), col('b'))
+            >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", 
"b")).select("a, b")
 
         :param fields: Expression string.
         :return: The result table.
         """
-        return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), 
self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
diff --git a/flink-python/pyflink/table/window.py 
b/flink-python/pyflink/table/window.py
index 630455e..282660c 100644
--- a/flink-python/pyflink/table/window.py
+++ b/flink-python/pyflink/table/window.py
@@ -144,6 +144,9 @@ class Session(object):
         >>> Session.with_gap(expr.lit(10).minutes)
         ...        .on(expr.col("rowtime"))
         ...        .alias("w")
+
+        >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
+
     """
 
     @classmethod
@@ -227,6 +230,8 @@ class Slide(object):
         ...      .every(expr.lit(5).minutes)
         ...      .on(expr.col("rowtime"))
         ...      .alias("w")
+
+        >>> 
Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
     """
 
     @classmethod
@@ -333,6 +338,8 @@ class Over(object):
         ...     .order_by(col("rowtime")) \\
         ...     .preceding(expr.UNBOUNDED_RANGE) \\
         ...     .alias("w")
+
+        >>> 
Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
     """
 
     @classmethod

Reply via email to