This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new beb7009 [FLINK-24267][python][docs] Update the example tutorial to
use latest API
beb7009 is described below
commit beb700978f6ce9e8bc8c1fc2c7aae738071bd2f2
Author: Dian Fu <[email protected]>
AuthorDate: Tue Sep 14 11:29:20 2021 +0800
[FLINK-24267][python][docs] Update the example tutorial to use latest API
This closes #17273.
---
.../docs/dev/python/datastream_tutorial.md | 199 +++++++++++---
.../docs/dev/python/table_api_tutorial.md | 301 ++++++++++++++-------
.../content/docs/dev/python/datastream_tutorial.md | 196 +++++++++++---
docs/content/docs/dev/python/table_api_tutorial.md | 275 +++++++++++++------
4 files changed, 699 insertions(+), 272 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/datastream_tutorial.md
b/docs/content.zh/docs/dev/python/datastream_tutorial.md
index d5fb302..c3c1f0b 100644
--- a/docs/content.zh/docs/dev/python/datastream_tutorial.md
+++ b/docs/content.zh/docs/dev/python/datastream_tutorial.md
@@ -32,16 +32,16 @@ Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流
## 你要搭建一个什么系统
在本教程中,你将学习如何编写一个简单的 Python DataStream 作业。
-例子是从非空集合中读取数据,并将结果写入本地文件系统。
+该程序读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。
## 准备条件
-本教程假设你对 Python 有一定的熟悉,但是即使你使用的是不同编程语言,你也应该能够学会。
+本教程假设你对 Python 有一定的了解,但是即使你使用的是其它编程语言,你也应该能够学会。
## 困难求助
如果你有疑惑,可以查阅 [社区支持资源](https://flink.apache.org/zh/community.html)。
-特别是,Apache Flink
[用户邮件列表](https://flink.apache.org/zh/community.html#mailing-lists)
一直被评为所有Apache项目中最活跃的一个,也是快速获得帮助的好方法。
+特别是,Apache Flink
[用户邮件列表](https://flink.apache.org/zh/community.html#mailing-lists) 一直被评为所有
Apache 项目中最活跃的一个,也是快速获得帮助的好方法。
## 怎样跟着教程练习
@@ -64,82 +64,197 @@ DataStream API 应用程序首先需要声明一个执行环境(`StreamExecuti
```python
env = StreamExecutionEnvironment.get_execution_environment()
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
```
一旦创建了 `StreamExecutionEnvironment` 之后,你可以使用它来声明数据源。数据源从外部系统(如 Apache
Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到 Flink 作业里。
-为了简单起见,本教程使用元素集合作为数据源。
+为了简单起见,本教程读取文件作为数据源。
```python
-ds = env.from_collection(
- collection=[(1, 'aaa'), (2, 'bbb')],
- type_info=Types.ROW([Types.INT(), Types.STRING()]))
+ds = env.from_source(
+ source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
+ input_path)
+ .process_static_file_set().build(),
+ watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="file_source"
+)
```
-这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的 `ROW` 类型)。
-
-你现在可以在这个数据流上执行转换操作,或者使用 _sink_ 将数据写入外部系统。本教程使用 `StreamingFileSink` 将数据写入
`/tmp/output` 文件目录中。
+你现在可以在这个数据流上执行转换操作,或者使用 _sink_ 将数据写入外部系统。本教程使用 `FileSink` 将结果数据写入文件中。
```python
-ds.add_sink(StreamingFileSink
- .for_row_format('/tmp/output', Encoder.simple_string_encoder())
- .build())
+ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+)
+
+def split(line):
+ yield from line.split()
+
+# compute word count
+ds = ds.flat_map(split) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(),
Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1]))
```
-最后一步是执行真实的 PyFlink DataStream API 作业。PyFlink applications
是懒加载的,并且只有在完全构建之后才会提交给集群上执行。要执行一个应用程序,你只需简单地调用 `env.execute(job_name)`。
+最后一步是执行 PyFlink DataStream API 作业。PyFlink applications
是懒加载的,并且只有在完全构建之后才会提交给集群上执行。要执行一个应用程序,你只需简单地调用 `env.execute()`。
```python
-env.execute("tutorial_job")
+env.execute()
```
完整的代码如下:
```python
-from pyflink.common.serialization import Encoder
-from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import StreamingFileSink
-
-
-def tutorial():
+import argparse
+import logging
+import sys
+
+from pyflink.common import WatermarkStrategy, Encoder, Types
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink,
OutputFileConfig,
+ RollingPolicy)
+
+
+word_count_data = ["To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."]
+
+
+def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
+ env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+ # write all the data to one file
env.set_parallelism(1)
- ds = env.from_collection(
- collection=[(1, 'aaa'), (2, 'bbb')],
- type_info=Types.ROW([Types.INT(), Types.STRING()]))
- ds.add_sink(StreamingFileSink
- .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+
+ # define the source
+ if input_path is not None:
+ ds = env.from_source(
+
source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
+ input_path)
+ .process_static_file_set().build(),
+ watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="file_source"
+ )
+ else:
+ print("Executing word_count example with default input data set.")
+ print("Use --input to specify file input.")
+ ds = env.from_collection(word_count_data)
+
+ def split(line):
+ yield from line.split()
+
+ # compute word count
+ ds = ds.flat_map(split) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(),
Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
.build())
- env.execute("tutorial_job")
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()
if __name__ == '__main__':
- tutorial()
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=False,
+ help='Input file to process.')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+
+ word_count(known_args.input, known_args.output)
```
## 执行一个 Flink Python DataStream API 程序
-现在你已经编写好 PyFlink 程序,可以运行它了!首先,需要确保输出目录不存在:
-
-```bash
-rm -rf /tmp/output
-```
-
-接下来,可以使用如下命令运行刚刚创建的示例:
+现在你已经编写好 PyFlink 程序,可以通过如下命令执行它:
```bash
-$ python datastream_tutorial.py
+$ python word_count.py
```
这个命令会在本地集群中构建并运行 PyFlink 程序。你也可以使用 [Job Submission Examples]({{< ref
"docs/deployment/cli" >}}#submitting-pyflink-jobs) 中描述的命令将其提交到远程集群。
-最后,你可以在命令行上看到执行结果:
+最后,你可以得到如下运行结果:
```bash
-$ find /tmp/output -type f -exec cat {} \;
-1,aaa
-2,bbb
+(a,5)
+(Be,1)
+(Is,1)
+(No,2)
+...
```
-本教程为你开始编写自己的 PyFlink DataStream API 程序提供了基础。如果需要了解更多关于 Python DataStream API
的使用,请查阅 {{< pythondoc name="Flink Python API 文档">}}。
+本教程为你开始编写自己的 PyFlink DataStream API 程序提供了基础。你也可以访问 {{< gh_link
file="flink-python/pyflink/examples" name="PyFlink 示例" >}},了解更多关于 PyFlink 的示例。
+如果需要了解更多关于 Python DataStream API 的使用,请查阅 {{< pythondoc name="Flink Python API
文档">}}。
diff --git a/docs/content.zh/docs/dev/python/table_api_tutorial.md
b/docs/content.zh/docs/dev/python/table_api_tutorial.md
index 04f45f6..5d4ab31 100644
--- a/docs/content.zh/docs/dev/python/table_api_tutorial.md
+++ b/docs/content.zh/docs/dev/python/table_api_tutorial.md
@@ -33,177 +33,274 @@ Apache Flink 提供 Table API 关系型 API 来统一处理流和批,即查询
## 概要
-在该教程中,我们会从零开始,介绍如何创建一个Flink Python项目及运行Python Table
API程序。该程序读取一个csv文件,处理后,将结果写到一个结果csv文件中。
+在该教程中,我们会从零开始,介绍如何创建一个 Flink Python 项目及运行 Python Table API 作业。该作业读取一个 csv
文件,计算词频,并将结果写到一个结果文件中。
## 先决条件
-本练习假定您对Python有一定的了解,但是即使您来自其他编程语言,也应该能够继续学习。
-它还假定您熟悉基本的关系操作,例如`SELECT`和`GROUP BY`子句。
+本练习假定你对 Python 有一定的了解,但是即使你来自其他编程语言,也应该能够继续学习。
+它还假定你熟悉基本的关系操作,例如 `SELECT` 和 `GROUP BY` 子句。
## 如何寻求帮助
-如果您遇到问题,可以访问 [社区信息页面](https://flink.apache.org/zh/community.html)。
-与此同时,Apache Flink
的[用户邮件列表](https://flink.apache.org/zh/community.html#mailing-lists)
一直被列为Apache项目中最活跃的项目邮件列表之一,也是快速获得帮助的好方法。
+如果你遇到问题,可以访问 [社区信息页面](https://flink.apache.org/zh/community.html)。
+与此同时,Apache Flink
的[用户邮件列表](https://flink.apache.org/zh/community.html#mailing-lists) 一直被列为
Apache 项目中最活跃的项目邮件列表之一,也是快速获得帮助的好方法。
## 继续我们的旅程
-如果要继续我们的旅程,您需要一台具有以下功能的计算机:
+如果要继续我们的旅程,你需要一台具有以下功能的计算机:
* Java 8 or 11
* Python 3.6, 3.7 or 3.8
-使用Python Table API需要安装PyFlink,它已经被发布到
[PyPi](https://pypi.org/project/apache-flink/),您可以通过如下方式安装PyFlink:
+使用 Python Table API 需要安装 PyFlink,它已经被发布到
[PyPi](https://pypi.org/project/apache-flink/),你可以通过如下方式安装 PyFlink:
```bash
$ python -m pip install apache-flink
```
-安装PyFlink后,您便可以编写Python Table API作业了。
+安装 PyFlink 后,你便可以编写 Python Table API 作业了。
-## 编写一个Flink Python Table API程序
+## 编写一个 Flink Python Table API 程序
-编写Flink Python Table API程序的第一步是创建`TableEnvironment`。这是Python Table API作业的入口类。
+编写 Flink Python Table API 程序的第一步是创建 `TableEnvironment`。这是 Python Table API
作业的入口类。
```python
-settings = EnvironmentSettings.in_batch_mode()
-t_env = TableEnvironment.create(settings)
-
-# write all the data to one file
+t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
```
接下来,我们将介绍如何创建源表和结果表。
```python
-t_env.create_temporary_table('mySource',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .build())
- .option('path', '/tmp/input')
- .format('csv')
- .build())
-
-t_env.create_temporary_table('mySink',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .column('count', DataTypes.BIGINT())
+t_env.create_temporary_table(
+ 'source',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .build())
+ .option('path', input_path)
+ .format('csv')
.build())
- .option('path', '/tmp/output')
- .format(FormatDescriptor.for_format('csv')
- .option('field-delimiter', '\t')
+tab = t_env.from_path('source')
+
+t_env.create_temporary_table(
+ 'sink',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .column('count', DataTypes.BIGINT())
+ .build())
+ .option('path', output_path)
+ .format(FormatDescriptor.for_format('canal-json')
+ .build())
.build())
- .build())
```
-You can also use the TableEnvironment.sql_update() method to register a
source/sink table defined in DDL:
+你也可以使用 `TableEnvironment.execute_sql()` 方法,通过 DDL 语句来注册源表和结果表:
```python
my_source_ddl = """
- create table mySource (
- word VARCHAR
+ create table source (
+ word STRING
) with (
'connector' = 'filesystem',
'format' = 'csv',
- 'path' = '/tmp/input'
+ 'path' = '{}'
)
-"""
+""".format(input_path)
my_sink_ddl = """
- create table mySink (
- word VARCHAR,
+ create table sink (
+ word STRING,
`count` BIGINT
) with (
'connector' = 'filesystem',
- 'format' = 'csv',
- 'path' = '/tmp/output'
+ 'format' = 'canal-json',
+ 'path' = '{}'
)
-"""
+""".format(output_path)
t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
```
-上面的程序展示了如何创建及在`ExecutionEnvironment`中注册表名分别为`mySource`和`mySink`的表。
-其中,源表`mySource`有一列: word,该表代表了从输入文件`/tmp/input`中读取的单词;
-结果表`mySink`有两列: word和count,该表会将计算结果输出到文件`/tmp/output`中,字段之间使用`\t`作为分隔符。
+上面的程序展示了如何创建及注册表名分别为 `source` 和 `sink` 的表。
+其中,源表 `source` 有一列: word,该表代表了从 `input_path` 所指定的输入文件中读取的单词;
+结果表 `sink` 有两列: word 和 count,该表的结果会输出到 `output_path` 所指定的输出文件中。
-接下来,我们介绍如何创建一个作业:该作业读取表`mySource`中的数据,进行一些变换,然后将结果写入表`mySink`。
+接下来,我们介绍如何创建一个作业:该作业读取表 `source` 中的数据,进行一些变换,然后将结果写入表 `sink`。
-最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表
-进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`execute_insert(sink_name)`被调用的时候,
+最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表
+进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 `execute_insert(sink_name)` 被调用的时候,
作业才会被真正提交到集群或者本地进行执行。
```python
-from pyflink.table.expressions import lit
-
-tab = t_env.from_path('mySource')
-tab.group_by(tab.word) \
- .select(tab.word, lit(1).count) \
- .execute_insert('mySink').wait()
+@udtf(result_types=[DataTypes.STRING()])
+def split(line: Row):
+ for s in line[0].split():
+ yield Row(s)
+
+# 计算 word count
+tab.flat_map(split).alias('word') \
+ .group_by(col('word')) \
+ .select(col('word'), lit(1).count) \
+ .execute_insert('sink') \
+ .wait()
```
该教程的完整代码如下:
```python
-from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
-from pyflink.table.descriptors import Schema, FileSystem
-from pyflink.table.expressions import lit
-
-settings = EnvironmentSettings.in_batch_mode()
-t_env = TableEnvironment.create(settings)
-
-# write all the data to one file
-t_env.get_config().get_configuration().set_string("parallelism.default", "1")
-
-t_env.create_temporary_table('mySource',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .build())
- .option('path', '/tmp/input')
- .format('csv')
- .build())
-
-t_env.create_temporary_table('mySink',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .column('count', DataTypes.BIGINT())
- .build())
- .option('path', '/tmp/output')
- .format(FormatDescriptor.for_format('csv')
- .option('field-delimiter', '\t')
- .build())
- .build())
-
-tab = t_env.from_path('mySource')
-tab.group_by(tab.word) \
- .select(tab.word, lit(1).count) \
- .execute_insert('mySink').wait()
+import argparse
+import logging
+import sys
+
+from pyflink.common import Row
+from pyflink.table import (EnvironmentSettings, TableEnvironment,
TableDescriptor, Schema,
+ DataTypes, FormatDescriptor)
+from pyflink.table.expressions import lit, col
+from pyflink.table.udf import udtf
+
+word_count_data = ["To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."]
+
+
+def word_count(input_path, output_path):
+ t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+ # write all the data to one file
+ t_env.get_config().get_configuration().set_string("parallelism.default",
"1")
+
+ # define the source
+ if input_path is not None:
+ t_env.create_temporary_table(
+ 'source',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .build())
+ .option('path', input_path)
+ .format('csv')
+ .build())
+ tab = t_env.from_path('source')
+ else:
+ print("Executing word_count example with default input data set.")
+ print("Use --input to specify file input.")
+ tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
+ DataTypes.ROW([DataTypes.FIELD('line',
DataTypes.STRING())]))
+
+ # define the sink
+ if output_path is not None:
+ t_env.create_temporary_table(
+ 'sink',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .column('count', DataTypes.BIGINT())
+ .build())
+ .option('path', output_path)
+ .format(FormatDescriptor.for_format('canal-json')
+ .build())
+ .build())
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ t_env.create_temporary_table(
+ 'sink',
+ TableDescriptor.for_connector('print')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .column('count', DataTypes.BIGINT())
+ .build())
+ .build())
+
+ @udtf(result_types=[DataTypes.STRING()])
+ def split(line: Row):
+ for s in line[0].split():
+ yield Row(s)
+
+ # compute word count
+ tab.flat_map(split).alias('word') \
+ .group_by(col('word')) \
+ .select(col('word'), lit(1).count) \
+ .execute_insert('sink') \
+ .wait()
+ # remove .wait if submitting to a remote cluster, refer to
+ #
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
+ # for more details
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=False,
+ help='Input file to process.')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+
+ word_count(known_args.input, known_args.output)
```
-## 执行一个Flink Python Table API程序
-
-首先,你需要在文件 “/tmp/input” 中准备好输入数据。你可以选择通过如下命令准备输入数据:
-
-```bash
-$ mkdir /tmp/input
-$ echo -e "flink\npyflink\nflink" > /tmp/input/input.csv
-```
+## 执行一个 Flink Python Table API 程序
-接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“/tmp/output”已经存在,你需要先删除文件,否则程序将无法正确运行起来):
+接下来,可以在命令行中运行作业(假设作业名为 word_count.py):
```bash
-$ python WordCount.py
+$ python word_count.py
```
-上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行,
+上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行,
可以参考[作业提交示例]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs)。
-最后,你可以通过如下命令查看你的运行结果:
+最后,你可以得到如下运行结果:
```bash
-$ cat /tmp/output/*
-flink 2
-pyflink 1
++I[To, 1]
++I[be,, 1]
++I[or, 1]
++I[not, 1]
+...
```
-上述教程介绍了如何编写并运行一个Flink Python Table API程序,如果想了解Flink Python Table API
-的更多信息,可以参考{{< pythondoc name="Flink Python API 文档">}}。
+上述教程介绍了如何编写并运行一个 Flink Python Table API 程序,你也可以访问 {{< gh_link
file="flink-python/pyflink/examples" name="PyFlink 示例" >}},了解更多关于 PyFlink 的示例。
+如果想了解 Flink Python Table API 的更多信息,可以参考{{< pythondoc name="Flink Python API
文档">}}。
diff --git a/docs/content/docs/dev/python/datastream_tutorial.md
b/docs/content/docs/dev/python/datastream_tutorial.md
index 8fe64d1..bbb0161 100644
--- a/docs/content/docs/dev/python/datastream_tutorial.md
+++ b/docs/content/docs/dev/python/datastream_tutorial.md
@@ -31,7 +31,7 @@ Apache Flink offers a DataStream API for building robust,
stateful streaming app
## What Will You Be Building?
In this tutorial, you will learn how to write a simple Python DataStream
pipeline.
-The pipeline will read data from a non-empty collection and write the results
to the local file system.
+The pipeline will read data from a csv file, compute the word frequency and
write the results to an output file.
## Prerequisites
@@ -63,82 +63,198 @@ DataStream API applications begin by declaring an
execution environment (`Stream
```python
env = StreamExecutionEnvironment.get_execution_environment()
+env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)
```
Once a `StreamExecutionEnvironment` is created, you can use it to declare your
_source_. Sources ingest data from external systems, such as Apache Kafka,
Rabbit MQ, or Apache Pulsar, into Flink Jobs.
-To keep things simple, this walkthrough uses a source that is backed by a
collection of elements.
+To keep things simple, this walkthrough uses a source which reads data from a
file.
```python
-ds = env.from_collection(
- collection=[(1, 'aaa'), (2, 'bbb')],
- type_info=Types.ROW([Types.INT(), Types.STRING()]))
+ds = env.from_source(
+ source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
+ input_path)
+ .process_static_file_set().build(),
+ watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="file_source"
+)
```
-This creates a data stream from the given collection, with the same type as
that of the elements in it (here, a `ROW` type with a INT field and a STRING
field).
-
-You can now perform transformations on this data stream, or just write the
data to an external system using a _sink_. This walkthrough uses the
`StreamingFileSink` sink connector to write the data into a file in the
`/tmp/output` directory.
+You can now perform transformations on this data stream, or just write the
data to an external system using a _sink_. This walkthrough uses the `FileSink`
sink connector to write the data into a file.
```python
-ds.add_sink(StreamingFileSink
- .for_row_format('/tmp/output', Encoder.simple_string_encoder())
- .build())
+ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+)
+
+def split(line):
+ yield from line.split()
+
+# 计算词频
+ds = ds.flat_map(split) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(),
Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1]))
```
-The last step is to execute the actual PyFlink DataStream API job. PyFlink
applications are built lazily and shipped to the cluster for execution only
once fully formed. To execute an application, you simply call
`env.execute(job_name)`.
+The last step is to execute the actual PyFlink DataStream API job. PyFlink
applications are built lazily and shipped to the cluster for execution only
once fully formed. To execute an application, you simply call `env.execute()`.
```python
-env.execute("tutorial_job")
+env.execute()
```
The complete code so far:
```python
-from pyflink.common.serialization import Encoder
-from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import StreamingFileSink
-
-
-def tutorial():
+import argparse
+import logging
+import sys
+
+from pyflink.common import WatermarkStrategy, Encoder, Types
+from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
+from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink,
OutputFileConfig,
+ RollingPolicy)
+
+
+word_count_data = ["To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."]
+
+
+def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
+ env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+ # write all the data to one file
env.set_parallelism(1)
- ds = env.from_collection(
- collection=[(1, 'aaa'), (2, 'bbb')],
- type_info=Types.ROW([Types.INT(), Types.STRING()]))
- ds.add_sink(StreamingFileSink
- .for_row_format('/tmp/output', Encoder.simple_string_encoder())
+
+ # define the source
+ if input_path is not None:
+ ds = env.from_source(
+
source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
+ input_path)
+ .process_static_file_set().build(),
+ watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="file_source"
+ )
+ else:
+ print("Executing word_count example with default input data set.")
+ print("Use --input to specify file input.")
+ ds = env.from_collection(word_count_data)
+
+ def split(line):
+ yield from line.split()
+
+ # compute word count
+ ds = ds.flat_map(split) \
+ .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(),
Types.INT()])) \
+ .key_by(lambda i: i[0]) \
+ .reduce(lambda i, j: (i[0], i[1] + j[1]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
.build())
- env.execute("tutorial_job")
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()
if __name__ == '__main__':
- tutorial()
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=False,
+ help='Input file to process.')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+
+ word_count(known_args.input, known_args.output)
```
## Executing a Flink Python DataStream API Program
-Now that you defined your PyFlink program, you can run it! First, make sure
that the output directory doesn't exist:
-
-```bash
-rm -rf /tmp/output
-```
-
-Next, you can run the example you just created on the command line:
+Now that you defined your PyFlink program, you can run the example you just
created on the command line:
```bash
-$ python datastream_tutorial.py
+$ python word_count.py
```
The command builds and runs your PyFlink program in a local mini cluster. You
can alternatively submit it to a remote cluster using the instructions detailed
in [Job Submission Examples]({{< ref "docs/deployment/cli"
>}}#submitting-pyflink-jobs).
-Finally, you can see the execution result on the command line:
+Finally, you can see the execution results similar to the following:
```bash
-$ find /tmp/output -type f -exec cat {} \;
-1,aaa
-2,bbb
+(a,5)
+(Be,1)
+(Is,1)
+(No,2)
+...
```
-This walkthrough gives you the foundations to get started writing your own
PyFlink DataStream API programs. To learn more about the Python DataStream API,
please refer to {{< pythondoc name="Flink Python API Docs">}} for more details.
+This walkthrough gives you the foundations to get started writing your own
PyFlink DataStream API programs.
+You can also refer to {{< gh_link file="flink-python/pyflink/examples"
name="PyFlink Examples" >}} for more examples.
+To learn more about the Python DataStream API, please refer to {{< pythondoc
name="Flink Python API Docs">}} for more details.
diff --git a/docs/content/docs/dev/python/table_api_tutorial.md
b/docs/content/docs/dev/python/table_api_tutorial.md
index aec9034..e6964af 100644
--- a/docs/content/docs/dev/python/table_api_tutorial.md
+++ b/docs/content/docs/dev/python/table_api_tutorial.md
@@ -34,7 +34,7 @@ Apache Flink offers a Table API as a unified, relational API
for batch and strea
## What Will You Be Building?
In this tutorial, you will learn how to build a pure Python Flink Table API
pipeline.
-The pipeline will read data from an input csv file and write the results to an
output csv file.
+The pipeline will read data from an input csv file, compute the word frequency
and write the results to an output file.
## Prerequisites
@@ -69,132 +69,228 @@ It can be used for setting execution parameters such as
restart strategy, defaul
The table config allows setting Table API specific configurations.
```python
-settings = EnvironmentSettings.in_batch_mode()
-t_env = TableEnvironment.create(settings)
-
-# write all the data to one file
+t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
```
-You can now create source and sink tables:
+You can now create the source and sink tables:
```python
-t_env.create_temporary_table('mySource',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .build())
- .option('path', '/tmp/input')
- .format('csv')
- .build())
-
-t_env.create_temporary_table('mySink',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .column('count', DataTypes.BIGINT())
+t_env.create_temporary_table(
+ 'source',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .build())
+ .option('path', input_path)
+ .format('csv')
.build())
- .option('path', '/tmp/output')
- .format(FormatDescriptor.for_format('csv')
- .option('field-delimiter', '\t')
+tab = t_env.from_path('source')
+
+t_env.create_temporary_table(
+ 'sink',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .column('count', DataTypes.BIGINT())
+ .build())
+ .option('path', output_path)
+ .format(FormatDescriptor.for_format('canal-json')
+ .build())
.build())
- .build())
```
-You can also use the TableEnvironment.sql_update() method to register a
source/sink table defined in DDL:
+You can also use the TableEnvironment.execute_sql() method to register a
source/sink table defined in DDL:
```python
my_source_ddl = """
- create table mySource (
- word VARCHAR
+ create table source (
+ word STRING
) with (
'connector' = 'filesystem',
'format' = 'csv',
- 'path' = '/tmp/input'
+ 'path' = '{}'
)
-"""
+""".format(input_path)
my_sink_ddl = """
- create table mySink (
- word VARCHAR,
+ create table sink (
+ word STRING,
`count` BIGINT
) with (
'connector' = 'filesystem',
- 'format' = 'csv',
- 'path' = '/tmp/output'
+ 'format' = 'canal-json',
+ 'path' = '{}'
)
-"""
+""".format(output_path)
t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
```
-This registers a table named `mySource` and a table named `mySink` in the
execution environment.
-The table `mySource` has only one column, word, and it consumes strings read
from file `/tmp/input`.
-The table `mySink` has two columns, word and count, and writes data to the
file `/tmp/output`, with `\t` as the field delimiter.
+This registers a table named `source` and a table named `sink` in the table
environment.
+The table `source` has only one column, word, and it consumes strings read
from file specified by `input_path`.
+The table `sink` has two columns, word and count, and writes data to the file
specified by `output_path`.
-You can now create a job which reads input from table `mySource`, performs
some transformations, and writes the results to table `mySink`.
+You can now create a job which reads input from table `source`, performs some
transformations, and writes the results to table `sink`.
-Finally you must execute the actual Flink Python Table API job.
+Finally, you must execute the actual Flink Python Table API job.
All operations, such as creating sources, transformations and sinks are lazy.
Only when `execute_insert(sink_name)` is called, the job will be submitted for
execution.
```python
-from pyflink.table.expressions import lit
-
-tab = t_env.from_path('mySource')
-tab.group_by(tab.word) \
- .select(tab.word, lit(1).count) \
- .execute_insert('mySink').wait()
+@udtf(result_types=[DataTypes.STRING()])
+def split(line: Row):
+ for s in line[0].split():
+ yield Row(s)
+
+# compute word count
+tab.flat_map(split).alias('word') \
+ .group_by(col('word')) \
+ .select(col('word'), lit(1).count) \
+ .execute_insert('sink') \
+ .wait()
```
The complete code so far:
```python
-from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
-from pyflink.table.descriptors import Schema, FileSystem
-from pyflink.table.expressions import lit
-
-settings = EnvironmentSettings.in_batch_mode()
-t_env = TableEnvironment.create(settings)
-
-# write all the data to one file
-t_env.get_config().get_configuration().set_string("parallelism.default", "1")
-
-t_env.create_temporary_table('mySource',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .build())
- .option('path', '/tmp/input')
- .format('csv')
- .build())
-
-t_env.create_temporary_table('mySink',
TableDescriptor.for_connector('filesystem')
- .schema(Schema.new_builder()
- .column('word', DataTypes.STRING())
- .column('count', DataTypes.BIGINT())
- .build())
- .option('path', '/tmp/output')
- .format(FormatDescriptor.for_format('csv')
- .option('field-delimiter', '\t')
- .build())
- .build())
-
-tab = t_env.from_path('mySource')
-tab.group_by(tab.word) \
- .select(tab.word, lit(1).count) \
- .execute_insert('mySink').wait()
+import argparse
+import logging
+import sys
+
+from pyflink.common import Row
+from pyflink.table import (EnvironmentSettings, TableEnvironment,
TableDescriptor, Schema,
+ DataTypes, FormatDescriptor)
+from pyflink.table.expressions import lit, col
+from pyflink.table.udf import udtf
+
+word_count_data = ["To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."]
+
+
+def word_count(input_path, output_path):
+ t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+ # write all the data to one file
+ t_env.get_config().get_configuration().set_string("parallelism.default",
"1")
+
+ # define the source
+ if input_path is not None:
+ t_env.create_temporary_table(
+ 'source',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .build())
+ .option('path', input_path)
+ .format('csv')
+ .build())
+ tab = t_env.from_path('source')
+ else:
+ print("Executing word_count example with default input data set.")
+ print("Use --input to specify file input.")
+ tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
+ DataTypes.ROW([DataTypes.FIELD('line',
DataTypes.STRING())]))
+
+ # define the sink
+ if output_path is not None:
+ t_env.create_temporary_table(
+ 'sink',
+ TableDescriptor.for_connector('filesystem')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .column('count', DataTypes.BIGINT())
+ .build())
+ .option('path', output_path)
+ .format(FormatDescriptor.for_format('canal-json')
+ .build())
+ .build())
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ t_env.create_temporary_table(
+ 'sink',
+ TableDescriptor.for_connector('print')
+ .schema(Schema.new_builder()
+ .column('word', DataTypes.STRING())
+ .column('count', DataTypes.BIGINT())
+ .build())
+ .build())
+
+ @udtf(result_types=[DataTypes.STRING()])
+ def split(line: Row):
+ for s in line[0].split():
+ yield Row(s)
+
+ # compute word count
+ tab.flat_map(split).alias('word') \
+ .group_by(col('word')) \
+ .select(col('word'), lit(1).count) \
+ .execute_insert('sink') \
+ .wait()
+ # remove .wait if submitting to a remote cluster, refer to
+ #
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
+ # for more details
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ required=False,
+ help='Input file to process.')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+
+ word_count(known_args.input, known_args.output)
```
## Executing a Flink Python Table API Program
-Firstly, you need to prepare input data in the "/tmp/input" directory. You can
choose the following command line to prepare the input data:
-
-```bash
-$ mkdir /tmp/input
-$ echo -e "flink\npyflink\nflink" > /tmp/input/input.csv
-```
-Next, you can run this example on the command line (Note: if the output
directory "/tmp/output" has already existed, you need to remove the file before
running the example):
+You can run this example on the command line:
```bash
-$ python WordCount.py
+$ python word_count.py
```
The command builds and runs the Python Table API program in a local mini
cluster.
@@ -202,14 +298,17 @@ You can also submit the Python Table API program to a
remote cluster, you can re
[Job Submission Examples]({{< ref "docs/deployment/cli"
>}}#submitting-pyflink-jobs)
for more details.
-Finally, you can see the execution result on the command line:
+Finally, you can see the execution results similar to the following:
```bash
-$ cat /tmp/output/*
-flink 2
-pyflink 1
++I[To, 1]
++I[be,, 1]
++I[or, 1]
++I[not, 1]
+...
```
This should get you started with writing your own Flink Python Table API
programs.
+You can also refer to {{< gh_link file="flink-python/pyflink/examples"
name="PyFlink Examples" >}} for more examples.
To learn more about the Python Table API, you can refer
{{< pythondoc name="Flink Python API Docs">}} for more details.