This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch args
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 778628175bac71956be9d075bb2df91eac2cacd0
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 00:47:16 2023 +0800

    [Improve] packageArgs improvement
---
 .../streampark/common/conf/FlinkVersion.scala      |  2 +
 .../streampark/common/util/PropertiesUtils.scala   | 26 ++++++-
 .../common/util/PropertiesUtilsTestCase.scala      | 89 +++++-----------------
 .../flink/client/trait/FlinkClientTrait.scala      | 48 +++++++-----
 4 files changed, 73 insertions(+), 92 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index c05e28a3e..5455a6d5d 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -59,6 +59,8 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
     lib
   }
 
+  lazy val flinkLibs: List[NetURL] = 
flinkLib.listFiles().map(_.toURI.toURL).toList
+
   lazy val version: String = {
     val flinkVersion = new AtomicReference[String]
     val cmd = List(
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 706b3b090..11d0d7c0b 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -30,7 +30,7 @@ import java.util.regex.Pattern
 import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions._
 import scala.collection.mutable
-import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
 
 object PropertiesUtils extends Logger {
 
@@ -305,6 +305,30 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  @Nonnull def extractArguments(args: String): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    if (StringUtils.isNotEmpty(args)) {
+      val array = args.split("\\s+")
+      val iter = array.iterator
+      while (iter.hasNext) {
+        val v = iter.next()
+        val p = v.take(1)
+        p match {
+          case "'" | "\"" =>
+            var value = v
+            if (!v.endsWith(p)) {
+              while (!value.endsWith(p) && iter.hasNext) {
+                value += s" ${iter.next()}"
+              }
+            }
+            programArgs += value.replaceAll(s"^$p|$p$$", "")
+          case _ => programArgs += v
+        }
+      }
+    }
+    programArgs.toList
+  }
+
   @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
     new JavaMap[String, String](extractDynamicProperties(properties).asJava)
 
diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 01e1b2743..9715327f8 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -16,83 +16,32 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.commons.lang3.StringUtils
 import org.junit.jupiter.api.{Assertions, Test}
 
-import scala.annotation.tailrec
-import scala.collection.mutable.ArrayBuffer
 import scala.language.postfixOps
 
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n" +
-      "--sql \"insert into table_a select * from table_b\"\n" +
-      "--c d\r\n" +
-      "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
-    val programArgs = new ArrayBuffer[String]()
-    if (StringUtils.isNotBlank(argsStr)) {
-      val multiChar = "\""
-      val array = argsStr.split("\\s+")
-      if (!array.exists(_.startsWith(multiChar))) {
-        array.foreach(
-          x => {
-            if (x.trim.nonEmpty) {
-              programArgs += x
-            }
-          })
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        @tailrec
-        def processElement(index: Int, multi: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index).trim
-          val until = if (elem.endsWith(multiChar)) 1 else 0
-
-          if (elem.isEmpty) {
-            processElement(next, multi = false)
-          } else {
-            if (multi) {
-              if (elem.endsWith(multiChar)) {
-                tempBuffer += elem.dropRight(1)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, multi = false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multi)
-              }
-            } else {
-              if (elem.startsWith(multiChar)) {
-                tempBuffer += elem.drop(1).dropRight(until)
-                processElement(next, multi = true)
-              } else {
-                argsArray += elem.dropRight(until)
-                processElement(next, multi = false)
-              }
-            }
-          }
-        }
-
-        processElement(0, multi = false)
-        argsArray.foreach(x => programArgs += x)
-      }
-    }
-
-    Assertions.assertEquals("localhost:8123", programArgs(1))
-    Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
-    Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
+    val args =
+      "mysql-sync-database " +
+        "--database employees " +
+        "--mysql-conf hostname=127.0.0.1 " +
+        "--mysql-conf port=3306 " +
+        "--mysql-conf username=root " +
+        "--mysql-conf password=123456 " +
+        "--mysql-conf database-name=employees " +
+        "--including-tables 'test|test.*' " +
+        "--excluding-tables \"emp_*\" " +
+        "--query 'select * from employees where age > 20' " +
+        "--sink-conf fenodes=127.0.0.1:8030 " +
+        "--sink-conf username=root " +
+        "--sink-conf password= " +
+        "--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " +
+        "--sink-conf sink.label-prefix=label" +
+        "--table-conf replication_num=1"
+    val programArgs = PropertiesUtils.extractArguments(args)
+    println(programArgs)
   }
 
   @Test def testDynamicProperties(): Unit = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 75a05326f..271d3d0df 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -237,27 +237,11 @@ trait FlinkClientTrait extends Logger {
   private[client] def getJobGraph(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
-    if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
-      val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
-      if (!FsOperator.lfs.exists(pythonVenv)) {
-        throw new RuntimeException(s"$pythonVenv File does not exist")
-      }
-
-      val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-      if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) 
{
-        flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
-      }
 
-      flinkConfig
-        // python.archives
-        .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
-        // python.client.executable
-        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
-        // python.executable
-        .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
-    }
-
-    val packageProgram = PackagedProgram.newBuilder
+    val pkgBuilder = PackagedProgram.newBuilder
+      .setUserClassPaths(
+        Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
+      )
       .setArguments(
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
@@ -266,8 +250,30 @@ trait FlinkClientTrait extends Logger {
         
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
       )
       .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-      .build()
 
+    submitRequest.developmentMode match {
+      case FlinkDevelopmentMode.PYFLINK =>
+        val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
+        if (!FsOperator.lfs.exists(pythonVenv)) {
+          throw new RuntimeException(s"$pythonVenv File does not exist")
+        }
+
+        val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+        if (FileUtils.exists(localLib) && 
FileUtils.directoryNotBlank(localLib)) {
+          flinkConfig.safeSet(PipelineOptions.JARS, 
util.Arrays.asList(localLib))
+        }
+
+        flinkConfig
+          // python.archives
+          .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+          // python.client.executable
+          .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
+          // python.executable
+          .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+      case _ =>
+    }
+
+    val packageProgram = pkgBuilder.build()
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
       flinkConfig,

Reply via email to