This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c2148b1  [FLINK-28364] Add Python Job example using Kubernetes Operator
c2148b1 is described below

commit c2148b194a124a1070d9f3a41bf72033c1426c15
Author: Biao Geng <[email protected]>
AuthorDate: Wed Jul 13 10:01:57 2022 +0800

    [FLINK-28364] Add Python Job example using Kubernetes Operator
    
    This closes #306.
---
 examples/flink-python-example/Dockerfile          | 44 ++++++++++++
 examples/flink-python-example/README.md           | 83 +++++++++++++++++++++++
 examples/flink-python-example/python-example.yaml | 42 ++++++++++++
 examples/flink-python-example/python_demo.py      | 49 +++++++++++++
 4 files changed, 218 insertions(+)

diff --git a/examples/flink-python-example/Dockerfile 
b/examples/flink-python-example/Dockerfile
new file mode 100644
index 0000000..bc684b0
--- /dev/null
+++ b/examples/flink-python-example/Dockerfile
@@ -0,0 +1,44 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Check 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
 for more details
+FROM flink:1.15.0
+
+# install python3: it has updated Python to 3.9 in Debian 11 and so install 
Python 3.7 from source, \
+# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
+
+RUN apt-get update -y && \
+apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev 
&& \
+wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
+tar -xvf Python-3.7.9.tgz && \
+cd Python-3.7.9 && \
+./configure --without-tests --enable-shared && \
+make -j6 && \
+make install && \
+ldconfig /usr/local/lib && \
+cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
+ln -s /usr/local/bin/python3 /usr/local/bin/python && \
+apt-get clean && \
+rm -rf /var/lib/apt/lists/*
+
+# install PyFlink
+RUN pip3 install apache-flink==1.15.0
+
+# add python script
+USER flink
+RUN mkdir /opt/flink/usrlib
+ADD python_demo.py /opt/flink/usrlib/python_demo.py
diff --git a/examples/flink-python-example/README.md 
b/examples/flink-python-example/README.md
new file mode 100644
index 0000000..9473658
--- /dev/null
+++ b/examples/flink-python-example/README.md
@@ -0,0 +1,83 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Kubernetes Operator Python Example
+
+## Overview
+
+This is an end-to-end example of running Flink Python jobs using the Flink 
Kubernetes Operator.
+
+
+*What's in this example?*
+
+ 1. Python script of a simple streaming job
+ 2. DockerFile to build custom image with pyflink and python demo
+ 3. Example YAML for submitting the python job using the operator
+
+## How does it work?
+
+Flink supports Python jobs in application mode by utilizing 
`org.apache.flink.client.python.PythonDriver` class as the 
+entry class. With the Flink Kubernetes Operator, we can reuse this class to 
run Python jobs as well. 
+
+The class is packaged in flink-python_${scala_version}-${flink_version}.jar 
which is in the default Flink image.
+So we do not need to create a new job jar. Instead, we just set `entryClass` 
of the job crd to 
+`org.apache.flink.client.python.PythonDriver`. After applying the job yaml, 
the launched job manager pod will run the `main()` 
+method of PythonDriver and parse arguments declared in the `args` field of the 
job crd.
+
+Note, in `args` field, users must either specify `-py` option or `-pym` 
option. 
+Besides, order of elements in `args` field matters: due to current parsing 
process, Flink specific options(e.g -pyfs, -py) must be placed at first and 
+user-defined arguments should be placed in the end. Check the 
[doc](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#submitting-pyflink-jobs)
 for more details about PyFlink arguments.
+
+A working example would be:
+```yaml
+args: ["-pyfs", "/opt/flink/usrlib/pythonjob/python_demo.py", "-pyclientexec", 
"/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pythonjob/python_demo.py", 
"-myarg", "123"]
+```
+But the following will throw exception:
+```yaml
+args: ["-myarg", "123", "-pyfs", "/opt/flink/usrlib/pythonjob/python_demo.py", 
"-pyclientexec", "/usr/local/bin/python3", "-py", 
"/opt/flink/usrlib/pythonjob/python_demo.py"]
+```
+
+## Usage
+
+The following steps assume that you have the Flink Kubernetes Operator 
installed and running in your environment.
+
+
+**Step 1**: Put your Python script files under the `flink-python-example` 
directory and add your Python script into the 
+Dockerfile
+
+**Step 2**: Build docker image
+
+Check this 
[doc](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker)
 for more details about building Pyflink image. Note, Pyflink 1.15.0 is only 
supported on x86 arch.  
+```bash
+# Uncomment when building for local minikube env:
+# eval $(minikube docker-env)
+
+DOCKER_BUILDKIT=1 docker build . -t flink-python-example:latest
+```
+This step will create an image based on an official Flink base image including 
the Python scripts.
+
+**Step 4**: Create FlinkDeployment Yaml and Submit
+
+Edit the included `python-example.yaml` so that the `job.args` section points 
to the Python script that you wish to execute, then submit it.
+
+```bash
+kubectl apply -f python-example.yaml
+```
+
+It is possible to reuse the above image for different Python scripts as long 
as users make them accessible on Job Manager Pod(e.g using PodTemplate with 
mounted storage).
diff --git a/examples/flink-python-example/python-example.yaml 
b/examples/flink-python-example/python-example.yaml
new file mode 100644
index 0000000..8540fcd
--- /dev/null
+++ b/examples/flink-python-example/python-example.yaml
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: python-example
+spec:
+  image: flink-python-example:latest
+  flinkVersion: v1_15
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "1"
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/opt/flink-python_2.12-1.15.0.jar # Note, this 
jarURI is actually a placeholder
+    entryClass: "org.apache.flink.client.python.PythonDriver"
+    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", 
"/opt/flink/usrlib/python_demo.py"]
+    parallelism: 1
+    upgradeMode: stateless
diff --git a/examples/flink-python-example/python_demo.py 
b/examples/flink-python-example/python_demo.py
new file mode 100644
index 0000000..f877f94
--- /dev/null
+++ b/examples/flink-python-example/python_demo.py
@@ -0,0 +1,49 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+import sys
+
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def python_demo():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)
+
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+    t_env.execute_sql("""
+    CREATE TABLE orders (
+      order_number BIGINT,
+      price        DECIMAL(32,2),
+      buyer        ROW<first_name STRING, last_name STRING>,
+      order_time   TIMESTAMP(3)
+    ) WITH (
+      'connector' = 'datagen'
+    )""")
+
+    t_env.execute_sql("""
+        CREATE TABLE print_table WITH ('connector' = 'print')
+          LIKE orders""")
+    t_env.execute_sql("""
+        INSERT INTO print_table SELECT * FROM orders""")
+
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+    python_demo()

Reply via email to