This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new bc0cdc5d630 [FLINK-27545][python][examples] Update the example in
PyFlink shell
bc0cdc5d630 is described below
commit bc0cdc5d63068543c0a91b6977e02de659ebd080
Author: Dian Fu <[email protected]>
AuthorDate: Mon May 9 10:37:42 2022 +0800
[FLINK-27545][python][examples] Update the example in PyFlink shell
This closes #19673.
---
.../docs/deployment/repls/python_shell.md | 4 +-
flink-python/pyflink/shell.py | 63 ++++++++++++----------
.../pyflink/table/tests/test_shell_example.py | 3 +-
3 files changed, 39 insertions(+), 31 deletions(-)
diff --git a/docs/content.zh/docs/deployment/repls/python_shell.md
b/docs/content.zh/docs/deployment/repls/python_shell.md
index f4b283ab37e..499e39aeab0 100644
--- a/docs/content.zh/docs/deployment/repls/python_shell.md
+++ b/docs/content.zh/docs/deployment/repls/python_shell.md
@@ -80,7 +80,7 @@ $ pyflink-shell.sh local
... .option("field-delimiter", ",")
... .build())
... .build())
->>> t.select("a + 1, b, c")\
+>>> t.select(col('a') + 1, col('b'), col('c'))\
... .execute_insert("stream_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
@@ -111,7 +111,7 @@ $ pyflink-shell.sh local
... .option("field-delimiter", ",")
... .build())
... .build())
->>> t.select("a + 1, b, c")\
+>>> t.select(col('a') + 1, col('b'), col('c'))\
... .execute_insert("batch_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index 334022a7c64..ce81574c219 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -74,34 +74,41 @@ NOTE: Use the prebound Table Environment to implement batch
or streaming Table p
Streaming - Use 's_env' and 'st_env' variables
- *
- * import tempfile
- * import os
- * import shutil
- * sink_path = tempfile.gettempdir() + '/streaming.csv'
- * if os.path.exists(sink_path):
- * if os.path.isfile(sink_path):
- * os.remove(sink_path)
- * else:
- * shutil.rmtree(sink_path)
- * s_env.set_parallelism(1)
- * t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
'b', 'c'])
- *
- * st_env.create_temporary_table("stream_sink",
TableDescriptor.for_connector("filesystem")
- * .schema(Schema.new_builder()
- * .column("a", DataTypes.BIGINT())
- * .column("b", DataTypes.STRING())
- * .column("c", DataTypes.STRING())
- * .build())
- * .option("path", sink_path)
- * .format(FormatDescriptor.for_format("csv")
- * .option("field-delimiter", ",")
- * .build())
- * .build())
- *
- * t.select("a + 1, b, c").insert_into("stream_sink")
- *
- * st_env.execute("stream_job")
+```
+import os
+import shutil
+import tempfile
+
+sink_path = tempfile.gettempdir() + '/streaming.csv'
+if os.path.exists(sink_path):
+ if os.path.isfile(sink_path):
+ os.remove(sink_path)
+ else:
+ shutil.rmtree(sink_path)
+
+s_env.set_parallelism(1)
+t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b',
'c'])
+
+st_env.create_temporary_table("stream_sink",
TableDescriptor.for_connector("filesystem")
+ .schema(Schema.new_builder()
+ .column("a", DataTypes.BIGINT())
+ .column("b", DataTypes.STRING())
+ .column("c", DataTypes.STRING())
+ .build())
+ .option("path", sink_path)
+ .format(FormatDescriptor.for_format("csv")
+ .option("field-delimiter", ",")
+ .build())
+ .build())
+
+t.select(col('a') + 1, col('b'), col('c')).insert_into("stream_sink")
+st_env.execute("stream_job")
+
+# show the results
+with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
+ print(f.read())
+
+```
'''
utf8_out.write(welcome_msg)
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py
b/flink-python/pyflink/table/tests/test_shell_example.py
index 4ad54f2be91..2fa9de2ca78 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -53,7 +53,8 @@ class ShellExampleTests(PyFlinkTestCase):
.build())
.build())
- t.select("a + 1, b, c").execute_insert("stream_sink").wait()
+ from pyflink.table.expressions import col
+ t.select(col('a') + 1, col('b'),
col('c')).execute_insert("stream_sink").wait()
# verify code, do not copy these code to shell.py
with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f: