This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae9a9e16f [Feature] Flink remote support pyflink maven dependency 
(#3124)
ae9a9e16f is described below

commit ae9a9e16ff177bbbcd447e01d25ff72e83c55d80
Author: ChengJie1053 <[email protected]>
AuthorDate: Sun Sep 17 13:26:00 2023 +0800

    [Feature] Flink remote support pyflink maven dependency (#3124)
    
    * [Feature] Flink remote support pyflink maven dependency
    
    * Modify FlinkRemoteBuildPipeline
---
 .../core/service/impl/AppBuildPipeServiceImpl.java |  2 +-
 .../flink/client/trait/FlinkClientTrait.scala      |  9 +++-
 .../pipeline/impl/FlinkRemoteBuildPipeline.scala   | 52 +++++++++++++++++++---
 3 files changed, 54 insertions(+), 9 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index a1fb0b4e8..65d6650e2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -467,7 +467,7 @@ public class AppBuildPipeServiceImpl
                 app.getLocalAppHome(),
                 mainClass,
                 flinkUserJar,
-                app.isCustomCodeOrPyFlinkJob(),
+                app.isCustomCodeJob(),
                 app.getExecutionModeEnum(),
                 app.getDevelopmentMode(),
                 flinkEnv.getFlinkVersion(),
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 716c9b2a7..71c4456d3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{ConfigConst, 
Workspace}
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode, RestoreMode}
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, Logger, 
SystemPropertyUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, 
SystemPropertyUtils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, 
SavepointConfigOptions}
 import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
 
+import java.util
 import java.util.{Collections, List => JavaList, Map => JavaMap}
 
 import scala.annotation.tailrec
@@ -240,6 +241,12 @@ trait FlinkClientTrait extends Logger {
       if (!FsOperator.lfs.exists(pythonVenv)) {
         throw new RuntimeException(s"$pythonVenv File does not exist")
       }
+
+      val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+      if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) 
{
+        flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+      }
+
       flinkConfig
         // python.archives
         .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
index d6889caa2..e3250b7c0 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
@@ -17,10 +17,16 @@
 
 package org.apache.streampark.flink.packer.pipeline.impl
 
-import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.enums.DevelopmentMode
+import org.apache.streampark.common.fs.{FsOperator, LfsOperator}
 import org.apache.streampark.flink.packer.maven.MavenTool
 import org.apache.streampark.flink.packer.pipeline._
 
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
+
 /** Building pipeline for flink standalone session mode */
 class FlinkRemoteBuildPipeline(request: FlinkRemotePerJobBuildRequest) extends 
BuildPipeline {
 
@@ -43,13 +49,45 @@ class FlinkRemoteBuildPipeline(request: 
FlinkRemotePerJobBuildRequest) extends B
       // build flink job shaded jar
       val shadedJar =
         execStep(2) {
-          val output = MavenTool.buildFatJar(
-            request.mainClass,
-            request.providedLibs,
-            request.getShadedJarPath(request.workspace))
-          logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
-          output
+          request.developmentMode match {
+            case DevelopmentMode.FLINK_SQL =>
+              val output = MavenTool.buildFatJar(
+                request.mainClass,
+                request.providedLibs,
+                request.getShadedJarPath(request.workspace))
+              logInfo(s"output shaded flink job jar: 
${output.getAbsolutePath}")
+              output
+            case _ => new File(request.customFlinkUserJar);
+          }
+        }.getOrElse(throw getError.exception)
+
+      val mavenJars =
+        execStep(3) {
+          request.developmentMode match {
+            case DevelopmentMode.PYFLINK =>
+              val mavenArts = 
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts.asJava)
+              mavenArts.map(_.getAbsolutePath) ++ 
request.dependencyInfo.extJarLibs
+            case _ => List[String]()
+          }
         }.getOrElse(throw getError.exception)
+
+      execStep(4) {
+        request.developmentMode match {
+          case DevelopmentMode.PYFLINK =>
+            mavenJars.foreach(
+              jar => {
+                val lfs: FsOperator = FsOperator.lfs
+                val lib = request.workspace.concat("/lib")
+                lfs.mkdirsIfNotExists(lib)
+                val originFile = new File(jar)
+                if (originFile.isFile) {
+                  lfs.copy(originFile.getAbsolutePath, lib)
+                }
+
+              })
+          case _ =>
+        }
+      }.getOrElse(throw getError.exception)
       ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
     }
 

Reply via email to