This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1599d8deb [Bug] extractProgramArgs parsing args parameter error (#1652)
1599d8deb is described below

commit 1599d8deb66eb94375cb61e37af9f70dca9ded21
Author: fantasticKe <[email protected]>
AuthorDate: Tue Sep 20 18:13:37 2022 +0800

    [Bug] extractProgramArgs parsing args parameter error (#1652)
    
    * fixed(streampark-flink-submit-core):extractProgramArgs parsing args 
parameter error
---
 .../flink/submit/trait/FlinkSubmitTrait.scala      | 72 +++++++++++++++++---
 .../flink/submit/test/ParameterTestCase.scala      | 78 ++++++++++++++++++++--
 2 files changed, 134 insertions(+), 16 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 57ec45112..3a9409268 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -18,16 +18,9 @@
 package org.apache.streampark.flink.submit.`trait`
 
 import com.google.common.collect.Lists
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode, ResolveOrder}
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
-import org.apache.streampark.flink.core.conf.FlinkRunOption
-import org.apache.streampark.flink.core.{ClusterClient => ClusterClientWrapper}
-import org.apache.streampark.flink.submit.bean._
 import org.apache.commons.cli.{CommandLine, Options}
 import org.apache.commons.collections.MapUtils
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines
 import org.apache.flink.client.cli._
@@ -37,10 +30,17 @@ import org.apache.flink.configuration._
 import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
 import org.apache.flink.util.FlinkException
 import org.apache.flink.util.Preconditions.checkNotNull
-import java.util.{Map => JavaMap}
+import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.Workspace
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode, ResolveOrder}
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
+import org.apache.streampark.flink.core.conf.FlinkRunOption
+import org.apache.streampark.flink.core.{ClusterClient => ClusterClientWrapper}
+import org.apache.streampark.flink.submit.bean._
+
 import java.io.File
 import java.util.concurrent.TimeUnit
-import java.util.{Collections, List => JavaList}
+import java.util.{Collections, List => JavaList, Map => JavaMap}
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -356,7 +356,57 @@ trait FlinkSubmitTrait extends Logger {
 
     val programArgs = new ArrayBuffer[String]()
 
-    Try(submitRequest.args.split("\\s+")).getOrElse(Array()).foreach(x => if 
(x.nonEmpty) programArgs += x)
+    if (StringUtils.isNotEmpty(submitRequest.args)) {
+
+      val array = submitRequest.args.split("\\s")
+      val argsArray = new ArrayBuffer[String]()
+      val tempBuffer = new ArrayBuffer[String]()
+
+      def processElement(index: Int, num: Int): Unit = {
+
+        if (index == array.length) {
+          if (tempBuffer.nonEmpty) {
+            argsArray += tempBuffer.mkString(" ")
+          }
+          return
+        }
+
+        val next = index + 1
+        val elem = array(index)
+
+        if (elem.trim.nonEmpty) {
+          if (num == 0) {
+            if (elem.startsWith("'")) {
+              tempBuffer += elem
+              processElement(next, 1)
+            } else if (elem.startsWith("\"")) {
+              tempBuffer += elem
+              processElement(next, 2)
+            } else {
+              argsArray += elem
+              processElement(next, 0)
+            }
+          } else {
+            tempBuffer += elem
+            val end1 = elem.endsWith("'") && num == 1
+            val end2 = elem.endsWith("\"") && num == 2
+            if (end1 || end2) {
+              argsArray += tempBuffer.mkString(" ")
+              tempBuffer.clear()
+              processElement(next, 0)
+            } else {
+              processElement(next, num)
+            }
+          }
+        } else {
+          tempBuffer += elem
+          processElement(next, 0)
+        }
+      }
+
+      processElement(0, 0)
+      argsArray.foreach(x => programArgs += 
x.trim.replaceAll("^[\"|']|[\"|']$", ""))
+    }
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
       programArgs += PARAM_KEY_FLINK_CONF
diff --git 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
index a74fa5826..e54448381 100644
--- 
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
+++ 
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
@@ -19,9 +19,14 @@ package org.apache.streampark.flink.submit.test
 import org.apache.flink.api.java.utils.ParameterTool
 import org.junit.jupiter.api.{Assertions, Test}
 
+import java.util.regex.Pattern
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
 class ParameterTestCase {
 
-  @Test def readArgs():Unit = {
+  @Test def readArgs(): Unit = {
     val arg = Array(
       "--flink.deployment.option.parallelism",
       "10"
@@ -36,10 +41,73 @@ class ParameterTestCase {
     )
     val param = 
ParameterTool.fromArgs(arg).mergeWith(ParameterTool.fromArgs(args))
 
-    Assertions.assertEquals(param.get("flink.home"), 
"hdfs://nameservice1/streampark/flink/flink-1.11.1")
-    Assertions.assertEquals(param.get("app.name"), "testApp123")
-    Assertions.assertEquals(param.get("flink.deployment.option.parallelism"), 
"5")
-
+    
Assertions.assertEquals("hdfs://nameservice1/streampark/flink/flink-1.11.1", 
param.get("flink.home"))
+    Assertions.assertEquals("testApp123", param.get("app.name"))
+    Assertions.assertEquals("5", 
param.get("flink.deployment.option.parallelism"))
   }
 
+  @Test def testExtractProgramArgs(): Unit = {
+
+    val argsStr = "--url localhost:8123 \n" +
+      "--insertSql1 'insert \'\'into default.test values (?,?,?,?,?)' \n" +
+      "--insertSql2 'insert into default.test values (1,2,3,4,\"111\")'\n "+
+      "--insertSql2 \"insert into default.test values (1,2,3,4,\'111\')\" \n" +
+      "--insertSql2 'insert into default.test values (1,2,3,4,\"111\", \'22\', 
\'\')'"
+
+    val array = argsStr.split("\\s")
+    val argsArray = new ArrayBuffer[String]()
+    val tempBuffer = new ArrayBuffer[String]()
+
+    def processElement(index: Int, num: Int): Unit = {
+
+      if (index == array.length) {
+        if (tempBuffer.nonEmpty) {
+          argsArray += tempBuffer.mkString(" ")
+        }
+        return
+      }
+
+      val next = index + 1
+      val elem = array(index)
+
+      if (elem.trim.nonEmpty) {
+        if (num == 0) {
+          if (elem.startsWith("'")) {
+            tempBuffer += elem
+            processElement(next, 1)
+          } else if (elem.startsWith("\"")) {
+            tempBuffer += elem
+            processElement(next, 2)
+          } else {
+            argsArray += elem
+            processElement(next, 0)
+          }
+        } else {
+          tempBuffer += elem
+          val end1 = elem.endsWith("'") && num == 1
+          val end2 = elem.endsWith("\"") && num == 2
+          if (end1 || end2) {
+            argsArray += tempBuffer.mkString(" ")
+            tempBuffer.clear()
+            processElement(next, 0)
+          } else {
+            processElement(next, num)
+          }
+        }
+      } else {
+        tempBuffer += elem
+        processElement(next, 0)
+      }
+    }
+    processElement(0, 0)
+
+    val programArgs = 
argsArray.map(_.trim.replaceAll("^[\"|']|[\"|']$","")).toList
+
+    Assertions.assertEquals("localhost:8123", programArgs(1))
+    Assertions.assertEquals("insert \'\'into default.test values (?,?,?,?,?)", 
programArgs(3))
+    Assertions.assertEquals("insert into default.test values 
(1,2,3,4,\"111\")", programArgs(5))
+    Assertions.assertEquals("insert into default.test values 
(1,2,3,4,\'111\')", programArgs(7))
+    Assertions.assertEquals("insert into default.test values (1,2,3,4,\"111\", 
\'22\', \'\')", programArgs(9))
+
+  }
 }

Reply via email to