This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b38079157 fix pyflink bug (#3016)
b38079157 is described below
commit b38079157c532a43bf9473776e68382361c0739a
Author: ChengJie1053 <[email protected]>
AuthorDate: Sun Sep 3 23:08:27 2023 +0800
fix pyflink bug (#3016)
---
.../streampark/console/core/entity/Application.java | 12 ++++++++++--
.../core/service/impl/AppBuildPipeServiceImpl.java | 6 ++++--
.../streampark-console-webapp/src/enums/flinkEnum.ts | 1 +
.../src/views/flink/app/Add.vue | 6 +++---
.../src/views/flink/app/hooks/useCreateSchema.ts | 7 +++++++
.../flink/client/impl/YarnApplicationClient.scala | 18 +++++++++++++-----
.../flink/client/trait/FlinkClientTrait.scala | 7 ++++---
7 files changed, 42 insertions(+), 15 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index edb56c6a0..b12218169 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -454,14 +454,22 @@ public class Application implements Serializable {
return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType());
}
+ @JsonIgnore
+ public boolean isCustomCodeOrPyFlinkJob() {
+ return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType())
+ || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+ }
+
@JsonIgnore
public boolean isUploadJob() {
- return isCustomCodeJob() &&
ResourceFrom.UPLOAD.getValue().equals(this.getResourceFrom());
+ return isCustomCodeOrPyFlinkJob()
+ && ResourceFrom.UPLOAD.getValue().equals(this.getResourceFrom());
}
@JsonIgnore
public boolean isCICDJob() {
- return isCustomCodeJob() &&
ResourceFrom.CICD.getValue().equals(this.getResourceFrom());
+ return isCustomCodeOrPyFlinkJob()
+ && ResourceFrom.CICD.getValue().equals(this.getResourceFrom());
}
public boolean isStreamParkJob() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 8fd23043f..7bbbb5afe 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -234,7 +234,7 @@ public class AppBuildPipeServiceImpl
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();
- if (app.isCustomCodeJob()) {
+ if (app.isCustomCodeOrPyFlinkJob()) {
// customCode upload jar to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
@@ -465,7 +465,7 @@ public class AppBuildPipeServiceImpl
app.getLocalAppHome(),
mainClass,
flinkUserJar,
- app.isCustomCodeJob(),
+ app.isCustomCodeOrPyFlinkJob(),
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
@@ -532,6 +532,8 @@ public class AppBuildPipeServiceImpl
"[StreamPark] unsupported ApplicationType of custom code: "
+ app.getApplicationType());
}
+ case PYFLINK:
+ return String.format("%s/%s", app.getAppHome(), app.getJar());
case FLINK_SQL:
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) {
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index c7c232125..5e02fdb61 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -167,6 +167,7 @@ export enum AppTypeEnum {
export enum JobTypeEnum {
JAR = 1,
SQL = 2,
+ PYFLINK = 3,
}
export enum ConfigTypeEnum {
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 727c9b5d2..677dc74ec 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -158,7 +158,7 @@
async function handleSubmitCustomJob(values) {
handleCluster(values);
const params = {
- jobType: JobTypeEnum.JAR,
+ jobType: values.jobType === 'pyflink' ?JobTypeEnum.PYFLINK :
JobTypeEnum.JAR,
projectId: values.project || null,
module: values.module || null,
appType: values.appType,
@@ -254,8 +254,8 @@
}
}
}
- if (formValue.jobType === 'customcode') {
- handleSubmitCustomJob(formValue);
+ if (formValue.jobType === 'customcode' || formValue.jobType ===
'pyflink') {
+ handleSubmitCustomJob(formValue);
} else {
handleSubmitSQL(formValue);
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 2b46d3499..65f5ba5f3 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -45,6 +45,13 @@ const getJobTypeOptions = () => {
]),
value: 'sql',
},
+ {
+ label: h('div', {}, [
+ h(SvgIcon, { name: 'pyflink', color: '#108ee9' }, ''),
+ h('span', { class: 'pl-10px' }, 'Python Flink'),
+ ]),
+ value: 'pyflink',
+ },
];
};
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 928e06414..8444aa2b7 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -109,11 +109,19 @@ object YarnApplicationClient extends YarnClientTrait {
.safeSet(YarnConfigOptions.SHIP_FILES, shipFiles)
// python.files
.safeSet(PythonOptions.PYTHON_FILES,
submitRequest.userJarFile.getParentFile.getName)
-
- val args = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
- args.add("-pym")
-
args.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
- flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, args)
+ // python.archives
+ .safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv)
+ // python.client.executable
+ .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+ // python.executable
+ .safeSet(PythonOptions.PYTHON_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+
+ val args: util.List[String] =
flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
+ // Caused by: java.lang.UnsupportedOperationException
+ val argsList: util.ArrayList[String] = new util.ArrayList[String](args)
+ argsList.add("-pym")
+
argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
+ flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList)
}
logInfo(s"""
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 2e48f7939..59aa46cc3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.{JobGraph,
SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
-import java.io.File
import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.annotation.tailrec
@@ -251,7 +250,6 @@ trait FlinkClientTrait extends Logger {
}
val packageProgram = PackagedProgram.newBuilder
- .setJarFile(submitRequest.userJarFile)
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
@@ -508,7 +506,10 @@ trait FlinkClientTrait extends Logger {
}
}
- if (submitRequest.developmentMode == DevelopmentMode.PYFLINK) {
+ if (
+ submitRequest.developmentMode == DevelopmentMode.PYFLINK &&
!submitRequest.executionMode
+ .equals(ExecutionMode.YARN_APPLICATION)
+ ) {
// python file
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)