This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch args
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/args by this push:
new e5797ae55 [Improve] method extractProgramArgs improvement
e5797ae55 is described below
commit e5797ae558a982631c16da2d9c9d43324301a63d
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 10:10:21 2023 +0800
[Improve] method extractProgramArgs improvement
---
.../flink/client/impl/YarnApplicationClient.scala | 6 +-
.../flink/client/trait/FlinkClientTrait.scala | 76 ++++------------------
2 files changed, 16 insertions(+), 66 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f5862a082..f7b38cdde 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -102,10 +102,8 @@ object YarnApplicationClient extends YarnClientTrait {
throw new RuntimeException(s"$pyVenv File does not exist")
}
- val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib))
{
- flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
- }
+ // including $app/lib
+ includingPipelineJars(submitRequest, flinkConfig)
// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index c7651b883..470aecd1d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType,
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils,
FileUtils, Logger, SystemPropertyUtils, Utils}
+import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils,
FileUtils, Logger, PropertiesUtils, SystemPropertyUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -274,12 +274,8 @@ trait FlinkClientTrait extends Logger {
if (!FsOperator.lfs.exists(pythonVenv)) {
throw new RuntimeException(s"$pythonVenv File does not exist")
}
-
- val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (FileUtils.exists(localLib) &&
FileUtils.directoryNotBlank(localLib)) {
- flinkConfig.safeSet(PipelineOptions.JARS,
util.Arrays.asList(localLib))
- }
-
+ // including $app/lib
+ includingPipelineJars(submitRequest, flinkConfig)
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -457,60 +453,7 @@ trait FlinkClientTrait extends Logger {
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
val programArgs = new ArrayBuffer[String]()
- val args = submitRequest.args
-
- if (StringUtils.isNotBlank(args)) {
- val multiChar = "\""
- val array = args.split("\\s+")
- if (!array.exists(_.startsWith(multiChar))) {
- array.foreach(programArgs +=)
- } else {
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- @tailrec
- def processElement(index: Int, multi: Boolean): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index).trim
-
- if (elem.isEmpty) {
- processElement(next, multi = false)
- } else {
- if (multi) {
- if (elem.endsWith(multiChar)) {
- tempBuffer += elem.dropRight(1)
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, multi = false)
- } else {
- tempBuffer += elem
- processElement(next, multi)
- }
- } else {
- val until = if (elem.endsWith(multiChar)) 1 else 0
- if (elem.startsWith(multiChar)) {
- tempBuffer += elem.drop(1).dropRight(until)
- processElement(next, multi = true)
- } else {
- argsArray += elem.dropRight(until)
- processElement(next, multi = false)
- }
- }
- }
- }
-
- processElement(0, multi = false)
- argsArray.foreach(x => programArgs += x)
- }
- }
+ programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
@@ -546,7 +489,7 @@ trait FlinkClientTrait extends Logger {
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
}
- programArgs.toList.asJava
+ Lists.newArrayList(programArgs: _*)
}
private[this] def applyConfiguration(
@@ -650,4 +593,13 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath,
savepointRequest.nativeFormat).get()
}
+ private[client] def includingPipelineJars(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration) = {
+ val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
+ flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+ }
+ }
+
}