This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new bc0cdc5d630 [FLINK-27545][python][examples] Update the example in 
PyFlink shell
bc0cdc5d630 is described below

commit bc0cdc5d63068543c0a91b6977e02de659ebd080
Author: Dian Fu <[email protected]>
AuthorDate: Mon May 9 10:37:42 2022 +0800

    [FLINK-27545][python][examples] Update the example in PyFlink shell
    
    This closes #19673.
---
 .../docs/deployment/repls/python_shell.md          |  4 +-
 flink-python/pyflink/shell.py                      | 63 ++++++++++++----------
 .../pyflink/table/tests/test_shell_example.py      |  3 +-
 3 files changed, 39 insertions(+), 31 deletions(-)

diff --git a/docs/content.zh/docs/deployment/repls/python_shell.md 
b/docs/content.zh/docs/deployment/repls/python_shell.md
index f4b283ab37e..499e39aeab0 100644
--- a/docs/content.zh/docs/deployment/repls/python_shell.md
+++ b/docs/content.zh/docs/deployment/repls/python_shell.md
@@ -80,7 +80,7 @@ $ pyflink-shell.sh local
 ...         .option("field-delimiter", ",")
 ...         .build())
 ...     .build())
->>> t.select("a + 1, b, c")\
+>>> t.select(col('a') + 1, col('b'), col('c'))\
 ...     .execute_insert("stream_sink").wait()
 >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
 >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
@@ -111,7 +111,7 @@ $ pyflink-shell.sh local
 ...         .option("field-delimiter", ",")
 ...         .build())
 ...     .build())
->>> t.select("a + 1, b, c")\
+>>> t.select(col('a') + 1, col('b'), col('c'))\
 ...     .execute_insert("batch_sink").wait()
 >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
 >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index 334022a7c64..ce81574c219 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -74,34 +74,41 @@ NOTE: Use the prebound Table Environment to implement batch 
or streaming Table p
 
   Streaming - Use 's_env' and 'st_env' variables
 
-    *
-    * import tempfile
-    * import os
-    * import shutil
-    * sink_path = tempfile.gettempdir() + '/streaming.csv'
-    * if os.path.exists(sink_path):
-    *     if os.path.isfile(sink_path):
-    *         os.remove(sink_path)
-    *     else:
-    *         shutil.rmtree(sink_path)
-    * s_env.set_parallelism(1)
-    * t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
-    *
-    * st_env.create_temporary_table("stream_sink", 
TableDescriptor.for_connector("filesystem")
-    *                               .schema(Schema.new_builder()
-    *                                       .column("a", DataTypes.BIGINT())
-    *                                       .column("b", DataTypes.STRING())
-    *                                       .column("c", DataTypes.STRING())
-    *                                       .build())
-    *                               .option("path", sink_path)
-    *                               .format(FormatDescriptor.for_format("csv")
-    *                                       .option("field-delimiter", ",")
-    *                                       .build())
-    *                               .build())
-    *
-    * t.select("a + 1, b, c").insert_into("stream_sink")
-    *
-    * st_env.execute("stream_job")
+```
+import os
+import shutil
+import tempfile
+
+sink_path = tempfile.gettempdir() + '/streaming.csv'
+if os.path.exists(sink_path):
+    if os.path.isfile(sink_path):
+        os.remove(sink_path)
+    else:
+        shutil.rmtree(sink_path)
+
+s_env.set_parallelism(1)
+t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
+
+st_env.create_temporary_table("stream_sink", 
TableDescriptor.for_connector("filesystem")
+                              .schema(Schema.new_builder()
+                                      .column("a", DataTypes.BIGINT())
+                                      .column("b", DataTypes.STRING())
+                                      .column("c", DataTypes.STRING())
+                                      .build())
+                              .option("path", sink_path)
+                              .format(FormatDescriptor.for_format("csv")
+                                      .option("field-delimiter", ",")
+                                      .build())
+                              .build())
+
+t.select(col('a') + 1, col('b'), col('c')).insert_into("stream_sink")
+st_env.execute("stream_job")
+
+# show the results
+with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
+    print(f.read())
+
+```
 '''
 utf8_out.write(welcome_msg)
 
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py 
b/flink-python/pyflink/table/tests/test_shell_example.py
index 4ad54f2be91..2fa9de2ca78 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -53,7 +53,8 @@ class ShellExampleTests(PyFlinkTestCase):
                                               .build())
                                       .build())
 
-        t.select("a + 1, b, c").execute_insert("stream_sink").wait()
+        from pyflink.table.expressions import col
+        t.select(col('a') + 1, col('b'), 
col('c')).execute_insert("stream_sink").wait()
 
         # verify code, do not copy these code to shell.py
         with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:

Reply via email to