This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 1599d8deb [Bug] extractProgramArgs parsing args parameter error (#1652)
1599d8deb is described below
commit 1599d8deb66eb94375cb61e37af9f70dca9ded21
Author: fantasticKe <[email protected]>
AuthorDate: Tue Sep 20 18:13:37 2022 +0800
[Bug] extractProgramArgs parsing args parameter error (#1652)
* fixed(streampark-flink-submit-core):extractProgramArgs parsing args
parameter error
---
.../flink/submit/trait/FlinkSubmitTrait.scala | 72 +++++++++++++++++---
.../flink/submit/test/ParameterTestCase.scala | 78 ++++++++++++++++++++--
2 files changed, 134 insertions(+), 16 deletions(-)
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 57ec45112..3a9409268 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -18,16 +18,9 @@
package org.apache.streampark.flink.submit.`trait`
import com.google.common.collect.Lists
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode, ResolveOrder}
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
-import org.apache.streampark.flink.core.conf.FlinkRunOption
-import org.apache.streampark.flink.core.{ClusterClient => ClusterClientWrapper}
-import org.apache.streampark.flink.submit.bean._
import org.apache.commons.cli.{CommandLine, Options}
import org.apache.commons.collections.MapUtils
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.JobID
import org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines
import org.apache.flink.client.cli._
@@ -37,10 +30,17 @@ import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull
-import java.util.{Map => JavaMap}
+import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.Workspace
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode, ResolveOrder}
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
+import org.apache.streampark.flink.core.conf.FlinkRunOption
+import org.apache.streampark.flink.core.{ClusterClient => ClusterClientWrapper}
+import org.apache.streampark.flink.submit.bean._
+
import java.io.File
import java.util.concurrent.TimeUnit
-import java.util.{Collections, List => JavaList}
+import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -356,7 +356,57 @@ trait FlinkSubmitTrait extends Logger {
val programArgs = new ArrayBuffer[String]()
- Try(submitRequest.args.split("\\s+")).getOrElse(Array()).foreach(x => if
(x.nonEmpty) programArgs += x)
+ if (StringUtils.isNotEmpty(submitRequest.args)) {
+
+ val array = submitRequest.args.split("\\s")
+ val argsArray = new ArrayBuffer[String]()
+ val tempBuffer = new ArrayBuffer[String]()
+
+ def processElement(index: Int, num: Int): Unit = {
+
+ if (index == array.length) {
+ if (tempBuffer.nonEmpty) {
+ argsArray += tempBuffer.mkString(" ")
+ }
+ return
+ }
+
+ val next = index + 1
+ val elem = array(index)
+
+ if (elem.trim.nonEmpty) {
+ if (num == 0) {
+ if (elem.startsWith("'")) {
+ tempBuffer += elem
+ processElement(next, 1)
+ } else if (elem.startsWith("\"")) {
+ tempBuffer += elem
+ processElement(next, 2)
+ } else {
+ argsArray += elem
+ processElement(next, 0)
+ }
+ } else {
+ tempBuffer += elem
+ val end1 = elem.endsWith("'") && num == 1
+ val end2 = elem.endsWith("\"") && num == 2
+ if (end1 || end2) {
+ argsArray += tempBuffer.mkString(" ")
+ tempBuffer.clear()
+ processElement(next, 0)
+ } else {
+ processElement(next, num)
+ }
+ }
+ } else {
+ tempBuffer += elem
+ processElement(next, 0)
+ }
+ }
+
+ processElement(0, 0)
+ argsArray.foreach(x => programArgs +=
x.trim.replaceAll("^[\"|']|[\"|']$", ""))
+ }
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
programArgs += PARAM_KEY_FLINK_CONF
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
index a74fa5826..e54448381 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
@@ -19,9 +19,14 @@ package org.apache.streampark.flink.submit.test
import org.apache.flink.api.java.utils.ParameterTool
import org.junit.jupiter.api.{Assertions, Test}
+import java.util.regex.Pattern
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
class ParameterTestCase {
- @Test def readArgs():Unit = {
+ @Test def readArgs(): Unit = {
val arg = Array(
"--flink.deployment.option.parallelism",
"10"
@@ -36,10 +41,73 @@ class ParameterTestCase {
)
val param =
ParameterTool.fromArgs(arg).mergeWith(ParameterTool.fromArgs(args))
- Assertions.assertEquals(param.get("flink.home"),
"hdfs://nameservice1/streampark/flink/flink-1.11.1")
- Assertions.assertEquals(param.get("app.name"), "testApp123")
- Assertions.assertEquals(param.get("flink.deployment.option.parallelism"),
"5")
-
+
Assertions.assertEquals("hdfs://nameservice1/streampark/flink/flink-1.11.1",
param.get("flink.home"))
+ Assertions.assertEquals("testApp123", param.get("app.name"))
+ Assertions.assertEquals("5",
param.get("flink.deployment.option.parallelism"))
}
+ @Test def testExtractProgramArgs(): Unit = {
+
+ val argsStr = "--url localhost:8123 \n" +
+ "--insertSql1 'insert \'\'into default.test values (?,?,?,?,?)' \n" +
+ "--insertSql2 'insert into default.test values (1,2,3,4,\"111\")'\n "+
+ "--insertSql2 \"insert into default.test values (1,2,3,4,\'111\')\" \n" +
+ "--insertSql2 'insert into default.test values (1,2,3,4,\"111\", \'22\',
\'\')'"
+
+ val array = argsStr.split("\\s")
+ val argsArray = new ArrayBuffer[String]()
+ val tempBuffer = new ArrayBuffer[String]()
+
+ def processElement(index: Int, num: Int): Unit = {
+
+ if (index == array.length) {
+ if (tempBuffer.nonEmpty) {
+ argsArray += tempBuffer.mkString(" ")
+ }
+ return
+ }
+
+ val next = index + 1
+ val elem = array(index)
+
+ if (elem.trim.nonEmpty) {
+ if (num == 0) {
+ if (elem.startsWith("'")) {
+ tempBuffer += elem
+ processElement(next, 1)
+ } else if (elem.startsWith("\"")) {
+ tempBuffer += elem
+ processElement(next, 2)
+ } else {
+ argsArray += elem
+ processElement(next, 0)
+ }
+ } else {
+ tempBuffer += elem
+ val end1 = elem.endsWith("'") && num == 1
+ val end2 = elem.endsWith("\"") && num == 2
+ if (end1 || end2) {
+ argsArray += tempBuffer.mkString(" ")
+ tempBuffer.clear()
+ processElement(next, 0)
+ } else {
+ processElement(next, num)
+ }
+ }
+ } else {
+ tempBuffer += elem
+ processElement(next, 0)
+ }
+ }
+ processElement(0, 0)
+
+ val programArgs =
argsArray.map(_.trim.replaceAll("^[\"|']|[\"|']$","")).toList
+
+ Assertions.assertEquals("localhost:8123", programArgs(1))
+ Assertions.assertEquals("insert \'\'into default.test values (?,?,?,?,?)",
programArgs(3))
+ Assertions.assertEquals("insert into default.test values
(1,2,3,4,\"111\")", programArgs(5))
+ Assertions.assertEquals("insert into default.test values
(1,2,3,4,\'111\')", programArgs(7))
+ Assertions.assertEquals("insert into default.test values (1,2,3,4,\"111\",
\'22\', \'\')", programArgs(9))
+
+ }
}