This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ae9a9e16f [Feature] Flink remote support pyflink maven dependency
(#3124)
ae9a9e16f is described below
commit ae9a9e16ff177bbbcd447e01d25ff72e83c55d80
Author: ChengJie1053 <[email protected]>
AuthorDate: Sun Sep 17 13:26:00 2023 +0800
[Feature] Flink remote support pyflink maven dependency (#3124)
* [Feature] Flink remote support pyflink maven dependency
* Modify FlinkRemoteBuildPipeline
---
.../core/service/impl/AppBuildPipeServiceImpl.java | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 9 +++-
.../pipeline/impl/FlinkRemoteBuildPipeline.scala | 52 +++++++++++++++++++---
3 files changed, 54 insertions(+), 9 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index a1fb0b4e8..65d6650e2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -467,7 +467,7 @@ public class AppBuildPipeServiceImpl
app.getLocalAppHome(),
mainClass,
flinkUserJar,
- app.isCustomCodeOrPyFlinkJob(),
+ app.isCustomCodeJob(),
app.getExecutionModeEnum(),
app.getDevelopmentMode(),
flinkEnv.getFlinkVersion(),
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 716c9b2a7..71c4456d3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{ConfigConst,
Workspace}
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode, RestoreMode}
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, Logger,
SystemPropertyUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger,
SystemPropertyUtils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph,
SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
+import java.util
import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.annotation.tailrec
@@ -240,6 +241,12 @@ trait FlinkClientTrait extends Logger {
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
+
+ val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib))
{
+ flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+ }
+
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
index d6889caa2..e3250b7c0 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
@@ -17,10 +17,16 @@
package org.apache.streampark.flink.packer.pipeline.impl
-import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.enums.DevelopmentMode
+import org.apache.streampark.common.fs.{FsOperator, LfsOperator}
import org.apache.streampark.flink.packer.maven.MavenTool
import org.apache.streampark.flink.packer.pipeline._
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
+
/** Building pipeline for flink standalone session mode */
class FlinkRemoteBuildPipeline(request: FlinkRemotePerJobBuildRequest) extends
BuildPipeline {
@@ -43,13 +49,45 @@ class FlinkRemoteBuildPipeline(request:
FlinkRemotePerJobBuildRequest) extends B
// build flink job shaded jar
val shadedJar =
execStep(2) {
- val output = MavenTool.buildFatJar(
- request.mainClass,
- request.providedLibs,
- request.getShadedJarPath(request.workspace))
- logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
- output
+ request.developmentMode match {
+ case DevelopmentMode.FLINK_SQL =>
+ val output = MavenTool.buildFatJar(
+ request.mainClass,
+ request.providedLibs,
+ request.getShadedJarPath(request.workspace))
+ logInfo(s"output shaded flink job jar:
${output.getAbsolutePath}")
+ output
+ case _ => new File(request.customFlinkUserJar);
+ }
+ }.getOrElse(throw getError.exception)
+
+ val mavenJars =
+ execStep(3) {
+ request.developmentMode match {
+ case DevelopmentMode.PYFLINK =>
+ val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts.asJava)
+ mavenArts.map(_.getAbsolutePath) ++
request.dependencyInfo.extJarLibs
+ case _ => List[String]()
+ }
}.getOrElse(throw getError.exception)
+
+ execStep(4) {
+ request.developmentMode match {
+ case DevelopmentMode.PYFLINK =>
+ mavenJars.foreach(
+ jar => {
+ val lfs: FsOperator = FsOperator.lfs
+ val lib = request.workspace.concat("/lib")
+ lfs.mkdirsIfNotExists(lib)
+ val originFile = new File(jar)
+ if (originFile.isFile) {
+ lfs.copy(originFile.getAbsolutePath, lib)
+ }
+
+ })
+ case _ =>
+ }
+ }.getOrElse(throw getError.exception)
ShadedBuildResponse(request.workspace, shadedJar.getAbsolutePath)
}