This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c2148b1 [FLINK-28364] Add Python Job example using Kubernetes Operator
c2148b1 is described below
commit c2148b194a124a1070d9f3a41bf72033c1426c15
Author: Biao Geng <[email protected]>
AuthorDate: Wed Jul 13 10:01:57 2022 +0800
[FLINK-28364] Add Python Job example using Kubernetes Operator
This closes #306.
---
examples/flink-python-example/Dockerfile | 44 ++++++++++++
examples/flink-python-example/README.md | 83 +++++++++++++++++++++++
examples/flink-python-example/python-example.yaml | 42 ++++++++++++
examples/flink-python-example/python_demo.py | 49 +++++++++++++
4 files changed, 218 insertions(+)
diff --git a/examples/flink-python-example/Dockerfile
b/examples/flink-python-example/Dockerfile
new file mode 100644
index 0000000..bc684b0
--- /dev/null
+++ b/examples/flink-python-example/Dockerfile
@@ -0,0 +1,44 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Check
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
for more details
+FROM flink:1.15.0
+
+# install python3: it has updated Python to 3.9 in Debian 11 and so install
Python 3.7 from source, \
+# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
+
+RUN apt-get update -y && \
+apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev
&& \
+wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
+tar -xvf Python-3.7.9.tgz && \
+cd Python-3.7.9 && \
+./configure --without-tests --enable-shared && \
+make -j6 && \
+make install && \
+ldconfig /usr/local/lib && \
+cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
+ln -s /usr/local/bin/python3 /usr/local/bin/python && \
+apt-get clean && \
+rm -rf /var/lib/apt/lists/*
+
+# install PyFlink
+RUN pip3 install apache-flink==1.15.0
+
+# add python script
+USER flink
+RUN mkdir /opt/flink/usrlib
+ADD python_demo.py /opt/flink/usrlib/python_demo.py
diff --git a/examples/flink-python-example/README.md
b/examples/flink-python-example/README.md
new file mode 100644
index 0000000..9473658
--- /dev/null
+++ b/examples/flink-python-example/README.md
@@ -0,0 +1,83 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Kubernetes Operator Python Example
+
+## Overview
+
+This is an end-to-end example of running Flink Python jobs using the Flink
Kubernetes Operator.
+
+
+*What's in this example?*
+
+ 1. Python script of a simple streaming job
+ 2. DockerFile to build custom image with pyflink and python demo
+ 3. Example YAML for submitting the python job using the operator
+
+## How does it work?
+
+Flink supports Python jobs in application mode by utilizing
`org.apache.flink.client.python.PythonDriver` class as the
+entry class. With the Flink Kubernetes Operator, we can reuse this class to
run Python jobs as well.
+
+The class is packaged in flink-python_${scala_version}-${flink_version}.jar
which is in the default Flink image.
+So we do not need to create a new job jar. Instead, we just set `entryClass`
of the job crd to
+`org.apache.flink.client.python.PythonDriver`. After applying the job yaml,
the launched job manager pod will run the `main()`
+method of PythonDriver and parse arguments declared in the `args` field of the
job crd.
+
+Note, in `args` field, users must either specify `-py` option or `-pym`
option.
+Besides, order of elements in `args` field matters: due to current parsing
process, Flink specific options(e.g -pyfs, -py) must be placed at first and
+user-defined arguments should be placed in the end. Check the
[doc](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#submitting-pyflink-jobs)
for more details about PyFlink arguments.
+
+A working example would be:
+```yaml
+args: ["-pyfs", "/opt/flink/usrlib/pythonjob/python_demo.py", "-pyclientexec",
"/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pythonjob/python_demo.py",
"-myarg", "123"]
+```
+But the following will throw exception:
+```yaml
+args: ["-myarg", "123", "-pyfs", "/opt/flink/usrlib/pythonjob/python_demo.py",
"-pyclientexec", "/usr/local/bin/python3", "-py",
"/opt/flink/usrlib/pythonjob/python_demo.py"]
+```
+
+## Usage
+
+The following steps assume that you have the Flink Kubernetes Operator
installed and running in your environment.
+
+
+**Step 1**: Put your Python script files under the `flink-python-example`
directory and add your Python script into the
+Dockerfile
+
+**Step 2**: Build docker image
+
+Check this
[doc](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker)
for more details about building Pyflink image. Note, Pyflink 1.15.0 is only
supported on x86 arch.
+```bash
+# Uncomment when building for local minikube env:
+# eval $(minikube docker-env)
+
+DOCKER_BUILDKIT=1 docker build . -t flink-python-example:latest
+```
+This step will create an image based on an official Flink base image including
the Python scripts.
+
+**Step 4**: Create FlinkDeployment Yaml and Submit
+
+Edit the included `python-example.yaml` so that the `job.args` section points
to the Python script that you wish to execute, then submit it.
+
+```bash
+kubectl apply -f python-example.yaml
+```
+
+It is possible to reuse the above image for different Python scripts as long
as users make them accessible on Job Manager Pod(e.g using PodTemplate with
mounted storage).
diff --git a/examples/flink-python-example/python-example.yaml
b/examples/flink-python-example/python-example.yaml
new file mode 100644
index 0000000..8540fcd
--- /dev/null
+++ b/examples/flink-python-example/python-example.yaml
@@ -0,0 +1,42 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ name: python-example
+spec:
+ image: flink-python-example:latest
+ flinkVersion: v1_15
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "1"
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ job:
+ jarURI: local:///opt/flink/opt/flink-python_2.12-1.15.0.jar # Note, this
jarURI is actually a placeholder
+ entryClass: "org.apache.flink.client.python.PythonDriver"
+ args: ["-pyclientexec", "/usr/local/bin/python3", "-py",
"/opt/flink/usrlib/python_demo.py"]
+ parallelism: 1
+ upgradeMode: stateless
diff --git a/examples/flink-python-example/python_demo.py
b/examples/flink-python-example/python_demo.py
new file mode 100644
index 0000000..f877f94
--- /dev/null
+++ b/examples/flink-python-example/python_demo.py
@@ -0,0 +1,49 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def python_demo():
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.set_parallelism(1)
+
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+ t_env.execute_sql("""
+ CREATE TABLE orders (
+ order_number BIGINT,
+ price DECIMAL(32,2),
+ buyer ROW<first_name STRING, last_name STRING>,
+ order_time TIMESTAMP(3)
+ ) WITH (
+ 'connector' = 'datagen'
+ )""")
+
+ t_env.execute_sql("""
+ CREATE TABLE print_table WITH ('connector' = 'print')
+ LIKE orders""")
+ t_env.execute_sql("""
+ INSERT INTO print_table SELECT * FROM orders""")
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+ python_demo()