This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new e807c0e9f [Feature] Yarn supports pyflink (#2956)
e807c0e9f is described below

commit e807c0e9f2f8c3507ae35870f715449928ad5de7
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Sep 1 21:59:56 2023 +0800

    [Feature] Yarn supports pyflink (#2956)
    
    * yarn application supports pyflink
    
    * yarn perjob supports pyflink
    
    * pyflink supports flink connector
---
 .../streampark/common/conf/ConfigConst.scala       |  8 ++
 .../apache/streampark/common/conf/Workspace.scala  |  2 +
 .../apache/streampark/common/util/EnvUtils.java    | 41 ++++++++++
 .../src/main/assembly/assembly.xml                 |  5 ++
 .../src/main/assembly/python/.gitkeep              |  0
 .../core/service/impl/ApplicationServiceImpl.java  |  9 +++
 .../core/service/impl/ResourceServiceImpl.java     |  4 +
 .../src/locales/lang/en/flink/app.ts               |  2 +-
 .../src/locales/lang/zh-CN/flink/app.ts            |  2 +-
 .../views/flink/app/components/UploadJobJar.vue    |  2 +-
 .../flink/client/bean/SubmitRequest.scala          |  1 +
 .../streampark-flink-client-core/pom.xml           |  7 ++
 .../src/main/resources/pyflink.md                  | 87 ++++++++++++++++++++++
 .../flink/client/impl/YarnApplicationClient.scala  | 45 ++++++++++-
 .../flink/client/trait/FlinkClientTrait.scala      | 65 +++++++++++++---
 .../apache/streampark/flink/util/FlinkUtils.scala  | 16 ++++
 16 files changed, 280 insertions(+), 16 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 2d6b74fba..6c5c6a6d5 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -26,6 +26,14 @@ object ConfigConst {
 
   val PARAM_PREFIX = "--"
 
+  /** pyflink */
+
+  val PYTHON_SUFFIX = ".py"
+
+  val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"
+
+  val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"
+
   /** about parameter... */
 
   val KEY_APP_HOME = "app.home"
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index 03b0c9682..d914780a1 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -102,6 +102,8 @@ case class Workspace(storageType: StorageType) {
 
   lazy val APP_UPLOADS = s"$WORKSPACE/uploads"
 
+  lazy val APP_PYTHON_VENV = s"$WORKSPACE/python/venv.zip"
+
   lazy val APP_WORKSPACE = s"$WORKSPACE/workspace"
 
   lazy val APP_FLINK = s"$WORKSPACE/flink"
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
new file mode 100644
index 000000000..a8fd54be3
--- /dev/null
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.common.util;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+
+public class EnvUtils {
+    public static void setEnv(String name, String value) throws Exception {
+        getModifiableEnvironment().put(name, value);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<String, String> getModifiableEnvironment() throws 
Exception {
+        Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
+        Method getenv = pe.getDeclaredMethod("getenv");
+        getenv.setAccessible(true);
+        Object unmodifiableEnvironment = getenv.invoke(null);
+        Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
+        Field m = map.getDeclaredField("m");
+        m.setAccessible(true);
+        return (Map<String, String>) m.get(unmodifiableEnvironment);
+    }
+}
+
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml 
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
index aa4677c4a..5161835c0 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
@@ -69,6 +69,11 @@
             <outputDirectory>logs</outputDirectory>
             <fileMode>0755</fileMode>
         </fileSet>
+        <fileSet>
+            
<directory>${project.build.directory}/../src/main/assembly/python</directory>
+            <outputDirectory>python</outputDirectory>
+            <fileMode>0755</fileMode>
+        </fileSet>
         <fileSet>
             
<directory>${project.build.directory}/../src/main/assembly/temp</directory>
             <outputDirectory>temp</outputDirectory>
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep
 
b/streampark-console/streampark-console-service/src/main/assembly/python/.gitkeep
new file mode 100644
index 000000000..e69de29bb
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 99a28a87c..9e970408f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1559,6 +1559,14 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     String applicationArgs =
         variableService.replaceVariable(application.getTeamId(), 
application.getArgs());
 
+    String pyflinkFilePath = "";
+    Resource resource =
+        resourceService.findByResourceName(application.getTeamId(), 
application.getJar());
+    if (resource != null
+        && StringUtils.isNotBlank(resource.getFilePath())
+        && resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX())) {
+      pyflinkFilePath = resource.getFilePath();
+    }
     SubmitRequest submitRequest =
         new SubmitRequest(
             flinkEnv.getFlinkVersion(),
@@ -1574,6 +1582,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             getSavePointed(appParam),
             appParam.getRestoreMode() == null ? null : 
RestoreMode.of(appParam.getRestoreMode()),
             applicationArgs,
+            pyflinkFilePath,
             buildResult,
             kubernetesSubmitParam,
             extraParameter);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 5e667d5b7..40314bca5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.Utils;
@@ -286,6 +287,9 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
           resp.put("exception", Utils.stringifyException(e));
           return RestResponse.success().data(resp);
         }
+        if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) {
+          return RestResponse.success().data(resp);
+        }
         Manifest manifest = Utils.getJarManifest(jarFile);
         String mainClass = manifest.getMainAttributes().getValue("Main-Class");
 
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index df9ee4a81..f9fd462d6 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -62,7 +62,7 @@ export default {
   programJar: 'Program Jar',
   dynamicProperties: 'Dynamic Properties',
   hadoopConfigTitle: 'System Hadoop Configuration',
-  dragUploadTitle: 'Click or drag jar to this area to upload',
+  dragUploadTitle: 'Click or drag jar or py to this area to upload',
   dragUploadTip:
     'Support for a single upload. You can upload a local jar here to support 
for current Job',
   dependencyError: 'please set flink version first.',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index bedd7fbb6..51f72ceb4 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -62,7 +62,7 @@ export default {
   programJar: '程序Jar文件',
   dynamicProperties: '动态参数',
   hadoopConfigTitle: '系统 Hadoop',
-  dragUploadTitle: '单击或拖动 jar 到此区域以上传',
+  dragUploadTitle: '单击或拖动 jar或py 到此区域以上传',
   dragUploadTip: '支持单次上传。您可以在此处上传本地 jar 以支持当前作业',
   dependencyError: '请先检查flink 版本.',
   status: '运行状态',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue
index d84913e40..ba8f12020 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/UploadJobJar.vue
@@ -53,7 +53,7 @@
   /* Callback before file upload */
   function handleBeforeUpload(file) {
     if (file.type !== 'application/java-archive') {
-      if (!/\.(jar|JAR)$/.test(file.name)) {
+      if (!/\.(jar|JAR|py)$/.test(file.name)) {
         emit('update:loading', false);
         createMessage.error('Only jar files can be uploaded! please check your 
file.');
         return false;
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 5e627add8..575d6eea9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -63,6 +63,7 @@ case class SubmitRequest(
     savePoint: String,
     restoreMode: RestoreMode,
     args: String,
+    pyflinkFilePath: String = "",
     @Nullable buildResult: BuildResult,
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index adadd682e..9e942fc1f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -64,6 +64,13 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-python_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client-api</artifactId>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md
new file mode 100644
index 000000000..385ce5d4a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/resources/pyflink.md
@@ -0,0 +1,87 @@
+
+### 1. linux Creates Python virtual environments
+
+#### 1.1. Prepare the 'setup-pyflink-virtual-env.sh script'. The default 
version of 'apache-flink' is '1.16.2'. You can change the version as 
required.The content is as follows
+
+```shell
+set -e
+# 下载Python 3.7 miniconda.sh脚本。
+wget 
"https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh"; -O 
"miniconda.sh"
+
+# 为Python 3.7 miniconda.sh脚本添加执行权限。
+chmod +x miniconda.sh
+
+# 创建Python的虚拟环境。
+./miniconda.sh -b -p venv
+
+# 激活Conda Python虚拟环境。
+source venv/bin/activate ""
+
+# 安装PyFlink依赖。
+pip install "apache-flink==1.16.2"
+
+# 退出Conda Python虚拟环境。
+conda deactivate
+
+# 删除缓存的包。
+rm -rf venv/pkgs
+
+# 将准备好的Conda Python虚拟环境打包。
+zip -r venv.zip venv
+```
+
+#### 1.2. Prepare the 'build.sh script'. The content is as follows
+
+The '/build' directory needs to be created by itself. You can also change it 
to another directory
+
+```shell
+#!/bin/bash
+set -e -x
+yum install -y zip wget
+
+cd /root/
+bash /build/setup-pyflink-virtual-env.sh
+mv venv.zip /build/
+```
+
+#### 1.3. Execute the following command
+
+```shell
+docker run -it --rm -v $PWD:/build  -w /build 
quay.io/pypa/manylinux2014_x86_64 ./build.sh
+```
+After this command is executed, a file named venv.zip will be generated, which 
is the virtual environment of Python 3.7. You can also modify the above script, 
install another version of the Python virtual environment, or install the 
required third-party Python packages in the virtual environment.
+
+### 2. Upload venv.zip to hdfs
+
+venv.zip is about 539M in size and needs to be uploaded to hdfs by itself
+
+```shell
+hadoop fs -put ./venv.zip /streampark/python
+```
+
+### 3. Copy venv.zip to $WORKSPACE/python
+
+```shell
+copy ./venv.zip $WORKSPACE/python
+```
+
+### 4. Copy Python dependencies to $FLINK_HOME/lib
+
+```shell
+cp -r $FLINK_HOME/opt/python  $FLINK_HOME/lib
+
+cp  $FLINK_HOME/opt/flink-python-* $FLINK_HOME/lib
+```
+
+### 5. If you use a flink connector dependency in your pyflink job, you need 
to put it in $FLINK_HOME/lib
+
+### 6. Reference document
+```text
+https://help.aliyun.com/document_detail/413966.html#:~:text=.%2Fsetup-pyflink-%20virtual%20-env.sh,%E8%AF%A5%E5%91%BD%E4%BB%A4%E6%89%A7%E8%A1%8C%E5%AE%8C%E6%88%90%E5%90%8E%EF%BC%8C%E4%BC%9A%E7%94%9F%E6%88%90%E4%B8%80%E4%B8%AA%E5%90%8D%E4%B8%BA%20venv%20%E7%9A%84%E7%9B%AE%E5%BD%95%EF%BC%8C%E5%8D%B3%E4%B8%BAPython%203.6%E7%9A%84%E8%99%9A%E6%8B%9F%E7%8E%AF%E5%A2%83%E3%80%82
+
+https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/cli/#submitting-pyflink-jobs
+
+https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/python_config/
+```
+
+
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 304735e2d..9c83659d0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -17,23 +17,29 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.conf.Workspace
+import org.apache.streampark.common.conf.{ConfigConst, Workspace}
 import org.apache.streampark.common.enums.DevelopmentMode
+import org.apache.streampark.common.fs.FsOperator
 import org.apache.streampark.common.util.{HdfsUtils, Utils}
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
+import org.apache.streampark.flink.util.FlinkUtils
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration._
+import org.apache.flink.python.PythonOptions
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.util.HadoopUtils
 import org.apache.flink.yarn.configuration.YarnConfigOptions
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
+import java.io.File
+import java.util
 import java.util.Collections
 import java.util.concurrent.Callable
 
@@ -93,6 +99,43 @@ object YarnApplicationClient extends YarnClientTrait {
       // yarn application Type
       .safeSet(YarnConfigOptions.APPLICATION_TYPE, 
submitRequest.applicationType.getName)
 
+    if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
+      val pythonVenv: String = workspace.APP_PYTHON_VENV
+      if (!FsOperator.hdfs.exists(pythonVenv)) {
+        throw new RuntimeException(s"$pythonVenv File does not exist")
+      }
+      val pyflinkFile: File = new File(submitRequest.pyflinkFilePath)
+
+      val argList = new util.ArrayList[String]()
+      argList.add("-pym")
+      argList.add(pyflinkFile.getName.replace(ConfigConst.PYTHON_SUFFIX, ""))
+
+      val pythonFlinkconnectorJars: String =
+        
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
+      if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
+        flinkConfig.setString(PipelineOptions.JARS.key(), 
pythonFlinkconnectorJars)
+      }
+
+      // yarn.ship-files
+      flinkConfig.setString(
+        YarnConfigOptions.SHIP_FILES.key(),
+        pyflinkFile.getParentFile.getAbsolutePath)
+
+      flinkConfig
+        // python.archives
+        .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+        // python.client.executable
+        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        // python.executable
+        .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        // python.files
+        .safeSet(PythonOptions.PYTHON_FILES, pyflinkFile.getParentFile.getName)
+        .safeSet(
+          ApplicationConfiguration.APPLICATION_MAIN_CLASS,
+          ConfigConst.PYTHON_DRIVER_CLASS_NAME)
+        .safeSet(ApplicationConfiguration.APPLICATION_ARGS, argList)
+    }
+
     logInfo(s"""
                
|------------------------------------------------------------------
                |Effective submit configuration: $flinkConfig
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 6ff949bd2..3a179d8bc 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -17,13 +17,15 @@
 
 package org.apache.streampark.flink.client.`trait`
 
+import org.apache.streampark.common.conf.{ConfigConst, Workspace}
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode, RestoreMode}
-import org.apache.streampark.common.util.{DeflaterUtils, Logger}
+import org.apache.streampark.common.fs.FsOperator
+import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
+import org.apache.streampark.flink.util.FlinkUtils
 
 import com.google.common.collect.Lists
 import org.apache.commons.cli.{CommandLine, Options}
@@ -35,6 +37,7 @@ import 
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram, 
PackagedProgramUtils}
 import org.apache.flink.configuration._
+import org.apache.flink.python.PythonOptions
 import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
 import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
@@ -141,6 +144,17 @@ trait FlinkClientTrait extends Logger {
           })
     }
 
+    if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
+      val flinkOptPath: String = 
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
+      if (StringUtils.isBlank(flinkOptPath)) {
+        logWarn(s"Get environment variable 
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
+        val flinkHome = submitRequest.flinkVersion.flinkHome
+        EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt");
+        logInfo(
+          s"Set temporary environment variables 
${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
+      }
+    }
+
     setConfig(submitRequest, flinkConfig)
 
     doSubmit(submitRequest, flinkConfig)
@@ -223,16 +237,43 @@ trait FlinkClientTrait extends Logger {
       flinkConfig: Configuration,
       submitRequest: SubmitRequest,
       jarFile: File): (PackagedProgram, JobGraph) = {
-    val packageProgram = PackagedProgram.newBuilder
-      .setJarFile(jarFile)
-      .setEntryPointClassName(
-        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
-      .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-      .setArguments(
-        flinkConfig
-          .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-          .orElse(Lists.newArrayList()): _*)
-      .build()
+    var packageProgram: PackagedProgram = null
+    if (StringUtils.isNotBlank(submitRequest.pyflinkFilePath)) {
+      val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
+      if (!FsOperator.lfs.exists(pythonVenv)) {
+        throw new RuntimeException(s"$pythonVenv File does not exist")
+      }
+      flinkConfig
+        // python.archives
+        .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+        // python.client.executable
+        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        // python.executable
+        .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+
+      val pythonFlinkconnectorJars: String =
+        
FlinkUtils.getPythonFlinkconnectorJars(submitRequest.flinkVersion.flinkHome)
+      if (StringUtils.isNotBlank(pythonFlinkconnectorJars)) {
+        flinkConfig.setString(PipelineOptions.JARS.key(), 
pythonFlinkconnectorJars)
+      }
+
+      packageProgram = PackagedProgram.newBuilder
+        .setEntryPointClassName(ConfigConst.PYTHON_DRIVER_CLASS_NAME)
+        .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
+        .setArguments("-py", submitRequest.pyflinkFilePath)
+        .build()
+    } else {
+      packageProgram = PackagedProgram.newBuilder
+        .setJarFile(jarFile)
+        .setEntryPointClassName(
+          
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
+        .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
+        .setArguments(
+          flinkConfig
+            .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
+            .orElse(Lists.newArrayList()): _*)
+        .build()
+    }
 
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
index 58d38d311..11364f02b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
@@ -48,6 +48,22 @@ object FlinkUtils {
     }
   }
 
+  /**
+   * Return a sample value:
+   *
+   * 
file:///flink-1.16.2/lib/flink-connector-jdbc-3.1.0-1.16.jar;file:///flink-1.16.2/lib/flink-sql-connector-mysql-cdc-2.4.0.jar
+   * @param flinkHome
+   * @return
+   *   flink-connector-xxx.jar and flink-sql-connector-xxx.jar
+   */
+  def getPythonFlinkconnectorJars(flinkHome: String): String = {
+    new 
File(s"$flinkHome/lib").list().filter(_.matches("flink.*connector.*\\.jar")) 
match {
+      case array if array.length > 0 =>
+        array.map(jar => s"file://$flinkHome/lib/$jar").mkString(";")
+      case _ => ""
+    }
+  }
+
   def isCheckpointEnabled(map: util.Map[String, String]): Boolean = {
     val checkpointInterval: Duration = TimeUtils.parseDuration(
       
map.getOrDefault(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key, 
"0ms"))

Reply via email to