This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch classpaths in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 7997c5dd1a1a50c81b61fd2ac015519bde9b589f Author: benjobs <[email protected]> AuthorDate: Sat Nov 4 23:24:28 2023 +0800 [Improve] submit flink job userclassPaths improvement --- .../flink/client/bean/SubmitRequest.scala | 11 +++++++-- .../flink/client/impl/YarnApplicationClient.scala | 4 ---- .../flink/client/trait/FlinkClientTrait.scala | 27 +++++----------------- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 25e17120d..1fe9f98c8 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointResto import javax.annotation.Nullable import java.io.File +import java.net.URL import java.util.{Map => JavaMap} import scala.collection.convert.ImplicitConversions._ @@ -55,7 +56,8 @@ case class SubmitRequest( @Nullable k8sSubmitParam: KubernetesSubmitParam, @Nullable extraParameter: JavaMap[String, Any]) { - lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX) + private[this] lazy val appProperties: Map[String, String] = getParameterMap( + KEY_FLINK_PROPERTY_PREFIX) lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX) @@ -68,9 +70,14 @@ case class SubmitRequest( lazy val effectiveAppName: String = if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName + lazy val localLibs: List[URL] = { + val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib" + Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL]) + } + lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString - lazy val allowNonRestoredState = Try( + lazy val allowNonRestoredState: Boolean = Try( properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean) .getOrElse(false) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index f7b38cdde..3470e8e67 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -101,10 +101,6 @@ object YarnApplicationClient extends YarnClientTrait { if (!FsOperator.hdfs.exists(pyVenv)) { throw new RuntimeException(s"$pyVenv File does not exist") } - - // including $app/lib - includingPipelineJars(submitRequest, flinkConfig) - // yarn.ship-files val shipFiles = new util.ArrayList[String]() shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 435d882c7..ffc6b47a8 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -20,7 +20,7 @@ package org.apache.streampark.flink.client.`trait` import org.apache.streampark.common.Constant import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.conf.Workspace -import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode} +import org.apache.streampark.common.enums._ import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util._ import org.apache.streampark.flink.client.bean._ @@ -43,6 +43,7 @@ import org.apache.flink.util.FlinkException import org.apache.flink.util.Preconditions.checkNotNull import java.io.File +import java.net.URL import java.util import java.util.{Collections, List => JavaList, Map => JavaMap} @@ -256,9 +257,10 @@ trait FlinkClientTrait extends Logger { flinkConfig: Configuration): (PackagedProgram, JobGraph) = { val pkgBuilder = PackagedProgram.newBuilder - .setUserClassPaths( - Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*) - ) + + val classPaths = submitRequest.flinkVersion.flinkLibs ++ submitRequest.localLibs + pkgBuilder + .setUserClassPaths(Lists.newArrayList(classPaths: _*)) .setEntryPointClassName( flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get() ) @@ -275,14 +277,6 @@ trait FlinkClientTrait extends Logger { if (!FsOperator.lfs.exists(pythonVenv)) { throw new RuntimeException(s"$pythonVenv File does not exist") } - // including $app/lib - val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib" - if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) { - val localLibUrl = new File(localLib).listFiles().map(_.toURI.toURL).toList - pkgBuilder.setUserClassPaths( - Lists.newArrayList(localLibUrl: _*) - ) - } flinkConfig // python.archives .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) @@ -600,13 +594,4 @@ trait FlinkClientTrait extends Logger { clientWrapper.triggerSavepoint(jobID, savepointPath, savepointRequest.nativeFormat).get() } - private[client] def includingPipelineJars( - submitRequest: SubmitRequest, - flinkConfig: Configuration) = { - val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib" - if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) { - flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib)) - } - } - }
