This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4754e9291 Flink yarn application support pyflink maven dependency
(#3042)
4754e9291 is described below
commit 4754e92918b1832ee41442d4d0c2bfaae64b7111
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Sep 11 22:47:07 2023 +0800
Flink yarn application support pyflink maven dependency (#3042)
---
.../streampark/common/util/DeflaterUtils.scala | 3 +++
.../apache/streampark/common/util/FileUtils.scala | 8 ++++++++
.../streampark/console/core/entity/Application.java | 6 ++++++
.../impl/ApplicationManageServiceImpl.java | 2 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 5 ++++-
.../src/views/flink/app/Add.vue | 20 ++++++++++++++++++++
.../views/flink/app/hooks/useCreateAndEditSchema.ts | 10 +++++-----
.../flink/client/impl/YarnApplicationClient.scala | 7 ++++++-
.../impl/FlinkYarnApplicationBuildPipeline.scala | 4 ++--
9 files changed, 55 insertions(+), 10 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
index 7069135e9..78beb3369 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.streampark.common.util
+import org.apache.commons.lang3.StringUtils
+
import java.io.ByteArrayOutputStream
import java.util.Base64
import java.util.zip.{DataFormatException, Deflater, Inflater}
@@ -24,6 +26,7 @@ object DeflaterUtils {
/** Compress the specified text */
def zipString(text: String): String = {
+ if (StringUtils.isBlank(text)) return ""
// compression level (0 ~ 9): low to high
// create a new deflater with the specified compression level
val deflater = new Deflater(Deflater.BEST_COMPRESSION)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 1522a4e8a..6b95884bc 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -123,6 +123,14 @@ object FileUtils {
}
}
+ def directoryNotBlank(file: Serializable): Boolean = {
+ file match {
+ case null => false
+ case f: File => f.isDirectory && f.list().length > 0
+ case p => new File(p.toString).isDirectory && new
File(p.toString).list().length > 0
+ }
+ }
+
def equals(file1: File, file2: File): Boolean = {
(file1, file2) match {
case (a, b) if a == null || b == null => false
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index acdb140a9..8eb0e39ed 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -449,6 +449,12 @@ public class Application implements Serializable {
return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType());
}
+ @JsonIgnore
+ public boolean isFlinkSqlJobOrPyFlinkJob() {
+ return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType())
+ || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+ }
+
@JsonIgnore
public boolean isCustomCodeJob() {
return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index ea8fc61c1..11b0f59a1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -315,7 +315,7 @@ public class ApplicationManageServiceImpl extends
ServiceImpl<ApplicationMapper,
}
if (save(appParam)) {
- if (appParam.isFlinkSqlJob()) {
+ if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = new FlinkSql(appParam);
flinkSqlService.create(flinkSql);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 7260fc72f..147137800 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -201,7 +201,7 @@ public class AppBuildPipeServiceImpl
// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(),
CandidateType.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(),
false);
- if (app.isFlinkSqlJob()) {
+ if (app.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql :
newFlinkSql;
Utils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
@@ -618,6 +618,9 @@ public class AppBuildPipeServiceImpl
private DependencyInfo getMergedDependencyInfo(Application application) {
DependencyInfo dependencyInfo = application.getDependencyInfo();
+ if (StringUtils.isBlank(application.getTeamResource())) {
+ return dependencyInfo;
+ }
try {
String[] resourceIds = JacksonUtils.read(application.getTeamResource(),
String[].class);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 677dc74ec..64e17fe6a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -157,10 +157,30 @@
/* custom mode */
async function handleSubmitCustomJob(values) {
handleCluster(values);
+ // Trigger a pom confirmation operation.
+ await unref(dependencyRef)?.handleApplyPom();
+ // common params...
+ const dependency: { pom?: string; jar?: string } = {};
+ const dependencyRecords = unref(dependencyRef)?.dependencyRecords;
+ const uploadJars = unref(dependencyRef)?.uploadJars;
+ if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) {
+ Object.assign(dependency, {
+ pom: unref(dependencyRecords),
+ });
+ }
+ if (uploadJars && unref(uploadJars).length > 0) {
+ Object.assign(dependency, {
+ jar: unref(uploadJars),
+ });
+ }
const params = {
jobType: values.jobType === 'pyflink' ?JobTypeEnum.PYFLINK :
JobTypeEnum.JAR,
projectId: values.project || null,
module: values.module || null,
+ dependency:
+ dependency.pom === undefined && dependency.jar === undefined
+ ? null
+ : JSON.stringify(dependency),
appType: values.appType,
};
handleSubmitParams(params, values, k8sTemplate);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index a6fbd022f..d66240c3e 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -141,11 +141,11 @@ export const useCreateAndEditSchema = (
component: 'Input',
slot: 'dependency',
ifShow: ({ values }) => {
- if (edit?.appId) {
- return values.jobType == JobTypeEnum.SQL;
- } else {
- return values?.jobType == 'sql';
- }
+ if (edit?.appId) {
+ return values.jobType == JobTypeEnum.SQL || values.jobType ==
JobTypeEnum.PYFLINK;
+ } else {
+ return values?.jobType == 'sql' || values?.jobType ==
'pyflink';
+ }
},
},
{ field: 'configOverride', label: '', component: 'Input', show: false },
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 8444aa2b7..b585a1df8 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.impl
import org.apache.streampark.common.conf.{ConfigConst, Workspace}
import org.apache.streampark.common.enums.DevelopmentMode
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{HdfsUtils, Utils}
+import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
@@ -101,6 +101,11 @@ object YarnApplicationClient extends YarnClientTrait {
throw new RuntimeException(s"$pyVenv File does not exist")
}
+ val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib))
{
+ flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+ }
+
// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 3606107c9..2902c339c 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -47,7 +47,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
override protected def buildProcess(): SimpleBuildResponse = {
execStep(1) {
request.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
+ case DevelopmentMode.FLINK_SQL | DevelopmentMode.PYFLINK =>
LfsOperator.mkCleanDirs(request.localWorkspace)
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
case _ =>
@@ -58,7 +58,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
val mavenJars =
execStep(2) {
request.developmentMode match {
- case DevelopmentMode.FLINK_SQL =>
+ case DevelopmentMode.FLINK_SQL | DevelopmentMode.PYFLINK =>
val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
mavenArts.map(_.getAbsolutePath) ++
request.dependencyInfo.extJarLibs
case _ => List[String]()