This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch args
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/args by this push:
     new e5797ae55 [Improve] method extractProgramArgs improvement
e5797ae55 is described below

commit e5797ae558a982631c16da2d9c9d43324301a63d
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 10:10:21 2023 +0800

    [Improve] method extractProgramArgs improvement
---
 .../flink/client/impl/YarnApplicationClient.scala  |  6 +-
 .../flink/client/trait/FlinkClientTrait.scala      | 76 ++++------------------
 2 files changed, 16 insertions(+), 66 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f5862a082..f7b38cdde 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -102,10 +102,8 @@ object YarnApplicationClient extends YarnClientTrait {
         throw new RuntimeException(s"$pyVenv File does not exist")
       }
 
-      val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-      if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) 
{
-        flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
-      }
+      // including $app/lib
+      includingPipelineJars(submitRequest, flinkConfig)
 
       // yarn.ship-files
       val shipFiles = new util.ArrayList[String]()
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index c7651b883..470aecd1d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.{ApplicationType, 
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils, 
FileUtils, Logger, SystemPropertyUtils, Utils}
+import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils, 
FileUtils, Logger, PropertiesUtils, SystemPropertyUtils, Utils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -274,12 +274,8 @@ trait FlinkClientTrait extends Logger {
         if (!FsOperator.lfs.exists(pythonVenv)) {
           throw new RuntimeException(s"$pythonVenv File does not exist")
         }
-
-        val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-        if (FileUtils.exists(localLib) && 
FileUtils.directoryNotBlank(localLib)) {
-          flinkConfig.safeSet(PipelineOptions.JARS, 
util.Arrays.asList(localLib))
-        }
-
+        // including $app/lib
+        includingPipelineJars(submitRequest, flinkConfig)
         flinkConfig
           // python.archives
           .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -457,60 +453,7 @@ trait FlinkClientTrait extends Logger {
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
     val programArgs = new ArrayBuffer[String]()
-    val args = submitRequest.args
-
-    if (StringUtils.isNotBlank(args)) {
-      val multiChar = "\""
-      val array = args.split("\\s+")
-      if (!array.exists(_.startsWith(multiChar))) {
-        array.foreach(programArgs +=)
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        @tailrec
-        def processElement(index: Int, multi: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index).trim
-
-          if (elem.isEmpty) {
-            processElement(next, multi = false)
-          } else {
-            if (multi) {
-              if (elem.endsWith(multiChar)) {
-                tempBuffer += elem.dropRight(1)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, multi = false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multi)
-              }
-            } else {
-              val until = if (elem.endsWith(multiChar)) 1 else 0
-              if (elem.startsWith(multiChar)) {
-                tempBuffer += elem.drop(1).dropRight(until)
-                processElement(next, multi = true)
-              } else {
-                argsArray += elem.dropRight(until)
-                processElement(next, multi = false)
-              }
-            }
-          }
-        }
-
-        processElement(0, multi = false)
-        argsArray.foreach(x => programArgs += x)
-      }
-    }
+    programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
 
@@ -546,7 +489,7 @@ trait FlinkClientTrait extends Logger {
       programArgs.add("-py")
       programArgs.add(submitRequest.userJarFile.getAbsolutePath)
     }
-    programArgs.toList.asJava
+    Lists.newArrayList(programArgs: _*)
   }
 
   private[this] def applyConfiguration(
@@ -650,4 +593,13 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath, 
savepointRequest.nativeFormat).get()
   }
 
+  private[client] def includingPipelineJars(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration) = {
+    val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+    if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
+      flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+    }
+  }
+
 }

Reply via email to