This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch job-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/job-state by this push:
new a73f3514c [Bug] extract programArgs bug fixed
a73f3514c is described below
commit a73f3514c060ab33c282197cecec0cfb52145648
Author: benjobs <[email protected]>
AuthorDate: Sat Nov 11 11:46:08 2023 +0800
[Bug] extract programArgs bug fixed
---
.../apache/streampark/common/util/PropertiesUtils.scala | 13 ++++++++++++-
.../streampark/common/util/PropertiesUtilsTestCase.scala | 16 ++++++++++++++--
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 3098daa5d..35facd89f 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -322,7 +322,18 @@ object PropertiesUtils extends Logger {
}
}
programArgs += value.substring(1, value.length - 1)
- case _ => programArgs += v
+ case _ =>
+ val regexp = "(.*)='(.*)'$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ val regexp = "(.*)=\"(.*)\"$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ programArgs += v
+ }
+ }
}
}
}
diff --git
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 995cfd2ec..827d0f11c 100644
---
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -24,8 +24,20 @@ import scala.language.postfixOps
class PropertiesUtilsTestCase {
@Test def testExtractProgramArgs(): Unit = {
- val args =
- "mysql-sync-database \n--database employees \n--mysql-conf
hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root
\n--mysql-conf password=123456 \n--mysql-conf database-name=employees
\n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030
\n--sink-conf username=root \n--sink-conf password= \n--sink-conf
jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf
sink.label-prefix=label\n--table-conf replication_num=1 "
+ val args = "mysql-sync-table \n" +
+ "--warehouse hdfs:///paimon \n" +
+ "--database test_db \n" +
+ "--table test_table \n" +
+ "--mysql-conf hostname=localhost \n" +
+ "--mysql-conf username=root \n" +
+ "--mysql-conf password=123456 \n" +
+ "--mysql-conf database-name='employees' \n" +
+ "--mysql-conf table-name='employees' \n" +
+ "--catalog-conf metastore=hive \n" +
+ "--catalog-conf uri=thrift://localhost:9083 \n" +
+ "--table-conf bucket=1 \n" +
+ "--table-conf changelog-producer=input \n" +
+ "--table-conf sink.parallelism=1"
val programArgs = new ArrayBuffer[String]()
programArgs ++= PropertiesUtils.extractArguments(args)
println(programArgs)