This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch job-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/job-state by this push:
     new a73f3514c [Bug] extract programArgs bug fixed
a73f3514c is described below

commit a73f3514c060ab33c282197cecec0cfb52145648
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 11 11:46:08 2023 +0800

    [Bug] extract programArgs bug fixed
---
 .../apache/streampark/common/util/PropertiesUtils.scala  | 13 ++++++++++++-
 .../streampark/common/util/PropertiesUtilsTestCase.scala | 16 ++++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 3098daa5d..35facd89f 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -322,7 +322,18 @@ object PropertiesUtils extends Logger {
               }
             }
             programArgs += value.substring(1, value.length - 1)
-          case _ => programArgs += v
+          case _ =>
+            val regexp = "(.*)='(.*)'$"
+            if (v.matches(regexp)) {
+              programArgs += v.replaceAll(regexp, "$1=$2")
+            } else {
+              val regexp = "(.*)=\"(.*)\"$"
+              if (v.matches(regexp)) {
+                programArgs += v.replaceAll(regexp, "$1=$2")
+              } else {
+                programArgs += v
+              }
+            }
         }
       }
     }
diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 995cfd2ec..827d0f11c 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -24,8 +24,20 @@ import scala.language.postfixOps
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val args =
-      "mysql-sync-database \n--database employees \n--mysql-conf 
hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root 
\n--mysql-conf password=123456 \n--mysql-conf database-name=employees 
\n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030 
\n--sink-conf username=root \n--sink-conf password= \n--sink-conf 
jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf 
sink.label-prefix=label\n--table-conf replication_num=1 "
+    val args = "mysql-sync-table \n" +
+      "--warehouse hdfs:///paimon \n" +
+      "--database test_db \n" +
+      "--table test_table \n" +
+      "--mysql-conf hostname=localhost \n" +
+      "--mysql-conf username=root \n" +
+      "--mysql-conf password=123456 \n" +
+      "--mysql-conf database-name='employees' \n" +
+      "--mysql-conf table-name='employees' \n" +
+      "--catalog-conf metastore=hive \n" +
+      "--catalog-conf uri=thrift://localhost:9083 \n" +
+      "--table-conf bucket=1 \n" +
+      "--table-conf changelog-producer=input \n" +
+      "--table-conf sink.parallelism=1"
     val programArgs = new ArrayBuffer[String]()
     programArgs ++= PropertiesUtils.extractArguments(args)
     println(programArgs)

Reply via email to