This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new b38079157 fix pyflink bug (#3016)
b38079157 is described below

commit b38079157c532a43bf9473776e68382361c0739a
Author: ChengJie1053 <[email protected]>
AuthorDate: Sun Sep 3 23:08:27 2023 +0800

    fix pyflink bug (#3016)
---
 .../streampark/console/core/entity/Application.java    | 12 ++++++++++--
 .../core/service/impl/AppBuildPipeServiceImpl.java     |  6 ++++--
 .../streampark-console-webapp/src/enums/flinkEnum.ts   |  1 +
 .../src/views/flink/app/Add.vue                        |  6 +++---
 .../src/views/flink/app/hooks/useCreateSchema.ts       |  7 +++++++
 .../flink/client/impl/YarnApplicationClient.scala      | 18 +++++++++++++-----
 .../flink/client/trait/FlinkClientTrait.scala          |  7 ++++---
 7 files changed, 42 insertions(+), 15 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index edb56c6a0..b12218169 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -454,14 +454,22 @@ public class Application implements Serializable {
     return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType());
   }
 
+  @JsonIgnore
+  public boolean isCustomCodeOrPyFlinkJob() {
+    return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType())
+        || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+  }
+
   @JsonIgnore
   public boolean isUploadJob() {
-    return isCustomCodeJob() && 
ResourceFrom.UPLOAD.getValue().equals(this.getResourceFrom());
+    return isCustomCodeOrPyFlinkJob()
+        && ResourceFrom.UPLOAD.getValue().equals(this.getResourceFrom());
   }
 
   @JsonIgnore
   public boolean isCICDJob() {
-    return isCustomCodeJob() && 
ResourceFrom.CICD.getValue().equals(this.getResourceFrom());
+    return isCustomCodeOrPyFlinkJob()
+        && ResourceFrom.CICD.getValue().equals(this.getResourceFrom());
   }
 
   public boolean isStreamParkJob() {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 8fd23043f..7bbbb5afe 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -234,7 +234,7 @@ public class AppBuildPipeServiceImpl
             // 2) some preparatory work
             String appUploads = app.getWorkspace().APP_UPLOADS();
 
-            if (app.isCustomCodeJob()) {
+            if (app.isCustomCodeOrPyFlinkJob()) {
               // customCode upload jar to appHome...
               String appHome = app.getAppHome();
               FsOperator fsOperator = app.getFsOperator();
@@ -465,7 +465,7 @@ public class AppBuildPipeServiceImpl
                 app.getLocalAppHome(),
                 mainClass,
                 flinkUserJar,
-                app.isCustomCodeJob(),
+                app.isCustomCodeOrPyFlinkJob(),
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
@@ -532,6 +532,8 @@ public class AppBuildPipeServiceImpl
                 "[StreamPark] unsupported ApplicationType of custom code: "
                     + app.getApplicationType());
         }
+      case PYFLINK:
+        return String.format("%s/%s", app.getAppHome(), app.getJar());
       case FLINK_SQL:
         String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
         if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
diff --git 
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts 
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index c7c232125..5e02fdb61 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -167,6 +167,7 @@ export enum AppTypeEnum {
 export enum JobTypeEnum {
   JAR = 1,
   SQL = 2,
+  PYFLINK = 3,
 }
 
 export enum ConfigTypeEnum {
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 727c9b5d2..677dc74ec 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -158,7 +158,7 @@
   async function handleSubmitCustomJob(values) {
     handleCluster(values);
     const params = {
-      jobType: JobTypeEnum.JAR,
+      jobType: values.jobType === 'pyflink' ?JobTypeEnum.PYFLINK : 
JobTypeEnum.JAR,
       projectId: values.project || null,
       module: values.module || null,
       appType: values.appType,
@@ -254,8 +254,8 @@
           }
         }
       }
-      if (formValue.jobType === 'customcode') {
-        handleSubmitCustomJob(formValue);
+      if (formValue.jobType === 'customcode' || formValue.jobType === 
'pyflink') {
+          handleSubmitCustomJob(formValue);
       } else {
         handleSubmitSQL(formValue);
       }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 2b46d3499..65f5ba5f3 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -45,6 +45,13 @@ const getJobTypeOptions = () => {
       ]),
       value: 'sql',
     },
+    {
+          label: h('div', {}, [
+              h(SvgIcon, { name: 'pyflink', color: '#108ee9' }, ''),
+              h('span', { class: 'pl-10px' }, 'Python Flink'),
+          ]),
+          value: 'pyflink',
+    },
   ];
 };
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 928e06414..8444aa2b7 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -109,11 +109,19 @@ object YarnApplicationClient extends YarnClientTrait {
         .safeSet(YarnConfigOptions.SHIP_FILES, shipFiles)
         // python.files
         .safeSet(PythonOptions.PYTHON_FILES, 
submitRequest.userJarFile.getParentFile.getName)
-
-      val args = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
-      args.add("-pym")
-      
args.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
-      flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, args)
+        // python.archives
+        .safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv)
+        // python.client.executable
+        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        // python.executable
+        .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+
+      val args: util.List[String] = 
flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
+      // Caused by: java.lang.UnsupportedOperationException
+      val argsList: util.ArrayList[String] = new util.ArrayList[String](args)
+      argsList.add("-pym")
+      
argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
+      flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList)
     }
 
     logInfo(s"""
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 2e48f7939..59aa46cc3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, 
SavepointConfigOptions}
 import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
-import java.io.File
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
 import scala.annotation.tailrec
@@ -251,7 +250,6 @@ trait FlinkClientTrait extends Logger {
     }
 
     val packageProgram = PackagedProgram.newBuilder
-      .setJarFile(submitRequest.userJarFile)
       .setArguments(
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
@@ -508,7 +506,10 @@ trait FlinkClientTrait extends Logger {
       }
     }
 
-    if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+    if (
+      submitRequest.developmentMode == DevelopmentMode.PYFLINK && 
!submitRequest.executionMode
+        .equals(ExecutionMode.YARN_APPLICATION)
+    ) {
       // python file
       programArgs.add("-py")
       programArgs.add(submitRequest.userJarFile.getAbsolutePath)

Reply via email to