This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4754e9291 Flink yarn application support pyflink maven dependency 
(#3042)
4754e9291 is described below

commit 4754e92918b1832ee41442d4d0c2bfaae64b7111
Author: ChengJie1053 <[email protected]>
AuthorDate: Mon Sep 11 22:47:07 2023 +0800

    Flink yarn application support pyflink maven dependency (#3042)
---
 .../streampark/common/util/DeflaterUtils.scala       |  3 +++
 .../apache/streampark/common/util/FileUtils.scala    |  8 ++++++++
 .../streampark/console/core/entity/Application.java  |  6 ++++++
 .../impl/ApplicationManageServiceImpl.java           |  2 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java   |  5 ++++-
 .../src/views/flink/app/Add.vue                      | 20 ++++++++++++++++++++
 .../views/flink/app/hooks/useCreateAndEditSchema.ts  | 10 +++++-----
 .../flink/client/impl/YarnApplicationClient.scala    |  7 ++++++-
 .../impl/FlinkYarnApplicationBuildPipeline.scala     |  4 ++--
 9 files changed, 55 insertions(+), 10 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
index 7069135e9..78beb3369 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DeflaterUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.commons.lang3.StringUtils
+
 import java.io.ByteArrayOutputStream
 import java.util.Base64
 import java.util.zip.{DataFormatException, Deflater, Inflater}
@@ -24,6 +26,7 @@ object DeflaterUtils {
 
   /** Compress the specified text */
   def zipString(text: String): String = {
+    if (StringUtils.isBlank(text)) return ""
     // compression level (0 ~ 9): low to high
     // create a new deflater with the specified compression level
     val deflater = new Deflater(Deflater.BEST_COMPRESSION)
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 1522a4e8a..6b95884bc 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -123,6 +123,14 @@ object FileUtils {
     }
   }
 
+  def directoryNotBlank(file: Serializable): Boolean = {
+    file match {
+      case null => false
+      case f: File => f.isDirectory && f.list().length > 0
+      case p => new File(p.toString).isDirectory && new 
File(p.toString).list().length > 0
+    }
+  }
+
   def equals(file1: File, file2: File): Boolean = {
     (file1, file2) match {
       case (a, b) if a == null || b == null => false
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index acdb140a9..8eb0e39ed 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -449,6 +449,12 @@ public class Application implements Serializable {
     return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType());
   }
 
+  @JsonIgnore
+  public boolean isFlinkSqlJobOrPyFlinkJob() {
+    return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType())
+        || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+  }
+
   @JsonIgnore
   public boolean isCustomCodeJob() {
     return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index ea8fc61c1..11b0f59a1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -315,7 +315,7 @@ public class ApplicationManageServiceImpl extends 
ServiceImpl<ApplicationMapper,
     }
 
     if (save(appParam)) {
-      if (appParam.isFlinkSqlJob()) {
+      if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
         FlinkSql flinkSql = new FlinkSql(appParam);
         flinkSqlService.create(flinkSql);
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 7260fc72f..147137800 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -201,7 +201,7 @@ public class AppBuildPipeServiceImpl
     // 1) flink sql setDependency
     FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), 
CandidateType.NEW);
     FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), 
false);
-    if (app.isFlinkSqlJob()) {
+    if (app.isFlinkSqlJobOrPyFlinkJob()) {
       FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : 
newFlinkSql;
       Utils.notNull(flinkSql);
       app.setDependency(flinkSql.getDependency());
@@ -618,6 +618,9 @@ public class AppBuildPipeServiceImpl
 
   private DependencyInfo getMergedDependencyInfo(Application application) {
     DependencyInfo dependencyInfo = application.getDependencyInfo();
+    if (StringUtils.isBlank(application.getTeamResource())) {
+      return dependencyInfo;
+    }
 
     try {
       String[] resourceIds = JacksonUtils.read(application.getTeamResource(), 
String[].class);
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 677dc74ec..64e17fe6a 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -157,10 +157,30 @@
   /* custom mode */
   async function handleSubmitCustomJob(values) {
     handleCluster(values);
+    // Trigger a pom confirmation operation.
+    await unref(dependencyRef)?.handleApplyPom();
+    // common params...
+    const dependency: { pom?: string; jar?: string } = {};
+    const dependencyRecords = unref(dependencyRef)?.dependencyRecords;
+    const uploadJars = unref(dependencyRef)?.uploadJars;
+    if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) {
+      Object.assign(dependency, {
+        pom: unref(dependencyRecords),
+      });
+    }
+    if (uploadJars && unref(uploadJars).length > 0) {
+      Object.assign(dependency, {
+        jar: unref(uploadJars),
+      });
+    }
     const params = {
       jobType: values.jobType === 'pyflink' ?JobTypeEnum.PYFLINK : 
JobTypeEnum.JAR,
       projectId: values.project || null,
       module: values.module || null,
+      dependency:
+          dependency.pom === undefined && dependency.jar === undefined
+              ? null
+              : JSON.stringify(dependency),
       appType: values.appType,
     };
     handleSubmitParams(params, values, k8sTemplate);
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index a6fbd022f..d66240c3e 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -141,11 +141,11 @@ export const useCreateAndEditSchema = (
         component: 'Input',
         slot: 'dependency',
         ifShow: ({ values }) => {
-          if (edit?.appId) {
-            return values.jobType == JobTypeEnum.SQL;
-          } else {
-            return values?.jobType == 'sql';
-          }
+            if (edit?.appId) {
+                return values.jobType == JobTypeEnum.SQL || values.jobType == 
JobTypeEnum.PYFLINK;
+            } else {
+                return values?.jobType == 'sql' || values?.jobType == 
'pyflink';
+            }
         },
       },
       { field: 'configOverride', label: '', component: 'Input', show: false },
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index 8444aa2b7..b585a1df8 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.impl
 import org.apache.streampark.common.conf.{ConfigConst, Workspace}
 import org.apache.streampark.common.enums.DevelopmentMode
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{HdfsUtils, Utils}
+import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
@@ -101,6 +101,11 @@ object YarnApplicationClient extends YarnClientTrait {
         throw new RuntimeException(s"$pyVenv File does not exist")
       }
 
+      val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+      if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) 
{
+        flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+      }
+
       // yarn.ship-files
       val shipFiles = new util.ArrayList[String]()
       shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 3606107c9..2902c339c 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -47,7 +47,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
   override protected def buildProcess(): SimpleBuildResponse = {
     execStep(1) {
       request.developmentMode match {
-        case DevelopmentMode.FLINK_SQL =>
+        case DevelopmentMode.FLINK_SQL | DevelopmentMode.PYFLINK =>
           LfsOperator.mkCleanDirs(request.localWorkspace)
           HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
         case _ =>
@@ -58,7 +58,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
     val mavenJars =
       execStep(2) {
         request.developmentMode match {
-          case DevelopmentMode.FLINK_SQL =>
+          case DevelopmentMode.FLINK_SQL | DevelopmentMode.PYFLINK =>
             val mavenArts = 
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
             mavenArts.map(_.getAbsolutePath) ++ 
request.dependencyInfo.extJarLibs
           case _ => List[String]()

Reply via email to