This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fd2865a [FLINK-12722][docs] Adds Python Table API tutorial
fd2865a is described below
commit fd2865a54fc0790d6774d1b645da4a65e3f5d741
Author: Dian Fu <[email protected]>
AuthorDate: Mon Jun 24 22:22:35 2019 +0800
[FLINK-12722][docs] Adds Python Table API tutorial
This closes #8907
---
docs/tutorials/python_table_api.md | 158 ++++++++++++++++++++++++++++++++++
docs/tutorials/python_table_api.zh.md | 146 +++++++++++++++++++++++++++++++
2 files changed, 304 insertions(+)
diff --git a/docs/tutorials/python_table_api.md
b/docs/tutorials/python_table_api.md
new file mode 100644
index 0000000..8a6e866
--- /dev/null
+++ b/docs/tutorials/python_table_api.md
@@ -0,0 +1,158 @@
+---
+title: "Python API Tutorial"
+nav-title: Python API
+nav-parent_id: apitutorials
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+In this guide we will start from scratch and go from setting up a Flink Python
project
+to running a Python Table API program.
+
+## Setting up a Python Project
+
+Firstly, you can fire up your favorite IDE and create a Python project and then
+you need to install the PyFlink package. Please
+see [Build PyFlink]({{ site.baseurl }}/flinkDev/building.html#build-pyflink)
+for more details about this.
+
+## Writing a Flink Python Table API Program
+
+The first step in a Flink Python Table API program is to create a
`BatchTableEnvironment`
+(or `StreamTableEnvironment` if you are writing a streaming job). It is the
main entry point
+for Python Table API jobs.
+
+{% highlight python %}
+exec_env = ExecutionEnvironment.get_execution_environment()
+exec_env.set_parallelism(1)
+t_config = TableConfig()
+t_env = BatchTableEnvironment.create(exec_env, t_config)
+{% endhighlight %}
+
+The `ExecutionEnvironment` (or `StreamExecutionEnvironment` if you are writing
a streaming job)
+can be used to set execution parameters, such as the restart strategy, default
parallelism, etc.
+
+The `TableConfig` can be used by setting the parameters such as the built-in
catalog name, the
+threshold where generating code, etc.
+
+Next we will create a source table and a sink table.
+
+{% highlight python %}
+t_env.connect(FileSystem().path('/tmp/input')) \
+ .with_format(OldCsv()
+ .line_delimiter(' ')
+ .field('word', DataTypes.STRING())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())) \
+ .register_table_source('mySource')
+
+t_env.connect(FileSystem().path('/tmp/output')) \
+ .with_format(OldCsv()
+ .field_delimiter('\t')
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .register_table_sink('mySink')
+{% endhighlight %}
+
+This registers a table named `mySource` and a table named `mySink` in the
+`ExecutionEnvironment`. The table `mySource` has only one column: word.
+It represents the words read from file `/tmp/input`. The table `mySink` has
two columns:
+word and count. It writes data to file `/tmp/output`, with `\t` as the field
delimiter.
+
+Then we need to create a job which reads input from table `mySource`, preforms
some
+operations and writes the results to table `mySink`.
+
+{% highlight python %}
+t_env.scan('mySource') \
+ .group_by('word') \
+ .select('word, count(1)') \
+ .insert_into('mySink')
+{% endhighlight %}
+
+The last thing is to start the actual Flink Python Table API job. All
operations, such as
+creating sources, transformations and sinks only build up a graph of internal
operations.
+Only when `exec_env.execute()` is called, this graph of operations will be
thrown on a cluster or
+executed on your local machine.
+
+{% highlight python %}
+exec_env.execute()
+{% endhighlight %}
+
+The complete code so far is as follows:
+
+{% highlight python %}
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
+from pyflink.table.descriptors import Schema, OldCsv, FileSystem
+
+exec_env = ExecutionEnvironment.get_execution_environment()
+exec_env.set_parallelism(1)
+t_config = TableConfig()
+t_env = BatchTableEnvironment.create(exec_env, t_config)
+
+t_env.connect(FileSystem().path('/tmp/input')) \
+ .with_format(OldCsv()
+ .line_delimiter(' ')
+ .field('word', DataTypes.STRING())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())) \
+ .register_table_source('mySource')
+
+t_env.connect(FileSystem().path('/tmp/output')) \
+ .with_format(OldCsv()
+ .field_delimiter('\t')
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .register_table_sink('mySink')
+
+t_env.scan('mySource') \
+ .group_by('word') \
+ .select('word, count(1)') \
+ .insert_into('mySink')
+
+exec_env.execute()
+{% endhighlight %}
+
+## Executing a Flink Python Table API Program
+
+You can run this example in your IDE or on the command line (suppose the job
script file is
+WordCount.py):
+
+{% highlight bash %}
+$ python WordCount.py
+{% endhighlight %}
+
+The command builds and runs the Python Table API program in a local mini
cluster.
+You can also submit the Python Table API program to a remote cluster, you can
refer
+[Job Submission Examples]({{ site.baseurl
}}/ops/cli.html#job-submission-examples)
+for more details.
+
+This should get you started with writing your own Flink Python Table API
programs.
+To learn more about the Python Table API, you can refer
+[Flink Python Table API Docs]({{ site.pythondocs_baseurl }}/api/python) for
more details.
diff --git a/docs/tutorials/python_table_api.zh.md
b/docs/tutorials/python_table_api.zh.md
new file mode 100644
index 0000000..81fc598
--- /dev/null
+++ b/docs/tutorials/python_table_api.zh.md
@@ -0,0 +1,146 @@
+---
+title: "Python API 教程"
+nav-title: Python API
+nav-parent_id: apitutorials
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+在该教程中,我们会从零开始,介绍如何创建一个Flink Python项目及运行Python Table API程序。
+
+## 创建一个Python Table API项目
+
+首先,你可以使用你最熟悉的IDE,创建一个Python项目。然后,你需要安装PyFlink包,
+请参考[构建PyFlink]({{ site.baseurl
}}/zh/flinkDev/building.html#build-pyflink)了解详细信息。
+
+## 编写一个Flink Python Table API程序
+
+编写Flink Python Table API程序的第一步是创建`BatchTableEnvironment`
+(或者`StreamTableEnvironment`,如果你要创建一个流式作业)。这是Python Table API作业的入口类。
+
+{% highlight python %}
+exec_env = ExecutionEnvironment.get_execution_environment()
+exec_env.set_parallelism(1)
+t_config = TableConfig()
+t_env = BatchTableEnvironment.create(exec_env, t_config)
+{% endhighlight %}
+
+`ExecutionEnvironment` (或者`StreamExecutionEnvironment`,如果你要创建一个流式作业)
+可以用来设置执行参数,比如重启策略,缺省并发值等。
+
+`TableConfig`可以用来设置缺省的catalog名字,自动生成代码时方法大小的阈值等.
+
+接下来,我们将介绍如何创建源表和结果表。
+
+{% highlight python %}
+t_env.connect(FileSystem().path('/tmp/input')) \
+ .with_format(OldCsv()
+ .line_delimiter(' ')
+ .field('word', DataTypes.STRING())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())) \
+ .register_table_source('mySource')
+
+t_env.connect(FileSystem().path('/tmp/output')) \
+ .with_format(OldCsv()
+ .field_delimiter('\t')
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .register_table_sink('mySink')
+{% endhighlight %}
+
+上面的程序展示了如何创建及在`ExecutionEnvironment`中注册表名分别为`mySource`和`mySink`的表。
+其中,源表`mySource`有一列: word,该表代表了从输入文件`/tmp/input`中读取的单词;
+结果表`mySink`有两列: word和count,该表会将计算结果输出到文件`/tmp/output`中,字段之间使用`\t`作为分隔符。
+
+接下来,我们介绍如何创建一个作业:该作业读取表`mySource`中的数据,进行一些变换,然后将结果写入表`mySink`。
+
+{% highlight python %}
+t_env.scan('mySource') \
+ .group_by('word') \
+ .select('word, count(1)') \
+ .insert_into('mySink')
+{% endhighlight %}
+
+最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表
+进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`exec_env.execute()`被调用的时候,
+作业才会被真正提交到集群或者本地进行执行。
+
+{% highlight python %}
+exec_env.execute()
+{% endhighlight %}
+
+该教程的完整代码如下:
+
+{% highlight python %}
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
+from pyflink.table.descriptors import Schema, OldCsv, FileSystem
+
+exec_env = ExecutionEnvironment.get_execution_environment()
+exec_env.set_parallelism(1)
+t_config = TableConfig()
+t_env = BatchTableEnvironment.create(exec_env, t_config)
+
+t_env.connect(FileSystem().path('/tmp/input')) \
+ .with_format(OldCsv()
+ .line_delimiter(' ')
+ .field('word', DataTypes.STRING())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())) \
+ .register_table_source('mySource')
+
+t_env.connect(FileSystem().path('/tmp/output')) \
+ .with_format(OldCsv()
+ .field_delimiter('\t')
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .with_schema(Schema()
+ .field('word', DataTypes.STRING())
+ .field('count', DataTypes.BIGINT())) \
+ .register_table_sink('mySink')
+
+t_env.scan('mySource') \
+ .group_by('word') \
+ .select('word, count(1)') \
+ .insert_into('mySink')
+
+exec_env.execute()
+{% endhighlight %}
+
+## 执行一个Flink Python Table API程序
+
+可以在IDE中或者命令行中运行作业(假设作业名为WordCount.py):
+
+{% highlight bash %}
+$ python WordCount.py
+{% endhighlight %}
+
+上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行,
+可以参考[作业提交示例]({{ site.baseurl }}/zh/ops/cli.html#job-submission-examples)。
+
+上述教程介绍了如何编写并运行一个Flink Python Table API程序,如果想了解Flink Python Table API
+的更多信息,可以参考[Flink Python Table API文档]({{ site.pythondocs_baseurl
}}zh/api/python)。