This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch issue-2869
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 7c1c6cfa5740306913f99bdd03e665c083b56977
Author: benjobs <[email protected]>
AuthorDate: Sat Aug 5 15:15:06 2023 +0800

    issue-2869 bug fixed.
---
 .../common/util/PropertiesUtilsTestCase.scala      | 70 +++++++++++++---------
 .../flink/client/trait/FlinkClientTrait.scala      | 62 +++++++++++--------
 .../flink/client/test/ParameterTestCase.scala      | 67 ---------------------
 3 files changed, 78 insertions(+), 121 deletions(-)

diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index e04e0e3be..acd2a431f 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -19,26 +19,33 @@ package org.apache.streampark.common.util
 import org.apache.commons.lang3.StringUtils
 import org.junit.jupiter.api.{Assertions, Test}
 
+import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
+import scala.language.postfixOps
 
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n\n\n" +
-      "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
+    val argsStr = "--host localhost:8123\n" +
+      "--sql \"insert into table_a select * from table_b\"\n" +
       "--c d\r\n" +
-      "--x yyy"
+      "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
     val programArgs = new ArrayBuffer[String]()
     if (StringUtils.isNotEmpty(argsStr)) {
-      val multiLineChar = "\"\"\""
+      val multiChar = "\""
       val array = argsStr.split("\\s+")
-      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
-        array.foreach(programArgs +=)
+      if (!array.exists(_.startsWith(multiChar))) {
+        array.foreach(x => {
+          if (x.trim.nonEmpty) {
+            programArgs += x
+          }
+        })
       } else {
         val argsArray = new ArrayBuffer[String]()
         val tempBuffer = new ArrayBuffer[String]()
 
-        def processElement(index: Int, multiLine: Boolean): Unit = {
+        @tailrec
+        def processElement(index: Int, multi: Boolean): Unit = {
 
           if (index == array.length) {
             if (tempBuffer.nonEmpty) {
@@ -48,43 +55,50 @@ class PropertiesUtilsTestCase {
           }
 
           val next = index + 1
-          val elem = array(index)
+          val elem = array(index).trim
 
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, true)
-              } else {
-                argsArray += elem
-                processElement(next, false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
+          if (elem.isEmpty) {
+            processElement(next, multi = false)
+          } else {
+            if (multi) {
+              if (elem.endsWith(multiChar)) {
+                tempBuffer += elem.dropRight(1)
                 argsArray += tempBuffer.mkString(" ")
                 tempBuffer.clear()
-                processElement(next, false)
+                processElement(next, multi = false)
               } else {
                 tempBuffer += elem
-                processElement(next, multiLine)
+                processElement(next, multi)
+              }
+            } else {
+              if (elem.startsWith(multiChar)) {
+                if (elem.endsWith(multiChar)) {
+                  tempBuffer += elem.drop(1).dropRight(1)
+                } else {
+                  tempBuffer += elem.drop(1)
+                }
+                processElement(next, multi = true)
+              } else {
+                if (elem.endsWith(multiChar)) {
+                  argsArray += elem.dropRight(1)
+                } else {
+                  argsArray += elem
+                }
+                processElement(next, multi = false)
               }
             }
-          } else {
-            tempBuffer += elem
-            processElement(next, false)
           }
         }
 
-        processElement(0, false)
-        argsArray.foreach(x => programArgs += x.trim)
+        processElement(0, multi = false)
+        argsArray.foreach(x => programArgs += x)
       }
     }
 
     Assertions.assertEquals("localhost:8123", programArgs(1))
     Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
     Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("yyy", programArgs(7))
+    Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
   }
 
   @Test def testDynamicProperties(): Unit = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8eec8118c..b1077cd45 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -405,56 +405,66 @@ trait FlinkClientTrait extends Logger {
   }
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
-
     val programArgs = new ArrayBuffer[String]()
+    val args = submitRequest.args
 
-    if (StringUtils.isNotEmpty(submitRequest.args)) {
-      val multiLineChar = "\"\"\""
-      val array = submitRequest.args.split("\\s+")
-      if (!array.exists(_.startsWith(multiLineChar))) {
+    if (StringUtils.isNotEmpty(args)) {
+      val multiChar = "\""
+      val array = args.split("\\s+")
+      if (!array.exists(_.startsWith(multiChar))) {
         array.foreach(programArgs +=)
       } else {
         val argsArray = new ArrayBuffer[String]()
         val tempBuffer = new ArrayBuffer[String]()
 
-        @tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
+        @tailrec
+        def processElement(index: Int, multi: Boolean): Unit = {
+
           if (index == array.length) {
             if (tempBuffer.nonEmpty) {
               argsArray += tempBuffer.mkString(" ")
             }
             return
           }
+
           val next = index + 1
-          val elem = array(index)
+          val elem = array(index).trim
 
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, multiLine = true)
-              } else {
-                argsArray += elem
-                processElement(next, multiLine = false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
+          if (elem.isEmpty) {
+            processElement(next, multi = false)
+          } else {
+            if (multi) {
+              if (elem.endsWith(multiChar)) {
+                tempBuffer += elem.dropRight(1)
                 argsArray += tempBuffer.mkString(" ")
                 tempBuffer.clear()
-                processElement(next, multiLine = false)
+                processElement(next, multi = false)
               } else {
                 tempBuffer += elem
-                processElement(next, multiLine)
+                processElement(next, multi)
+              }
+            } else {
+              if (elem.startsWith(multiChar)) {
+                if (elem.endsWith(multiChar)) {
+                  tempBuffer += elem.drop(1).dropRight(1)
+                } else {
+                  tempBuffer += elem.drop(1)
+                }
+                processElement(next, multi = true)
+              } else {
+                if (elem.endsWith(multiChar)) {
+                  argsArray += elem.dropRight(1)
+                } else {
+                  argsArray += elem
+                }
+                processElement(next, multi = false)
               }
             }
-          } else {
-            tempBuffer += elem
-            processElement(next, multiLine = false)
           }
         }
 
-        processElement(0, multiLine = false)
-        argsArray.foreach(x => programArgs += x.trim)
+        processElement(0, multi = false)
+        argsArray.foreach(x => programArgs += x)
       }
     }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
index 32929d2d2..4827d824d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala
@@ -16,12 +16,9 @@
  */
 package org.apache.streampark.flink.client.test
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
 import org.junit.jupiter.api.{Assertions, Test}
 
-import scala.collection.mutable.ArrayBuffer
-
 class ParameterTestCase {
 
   @Test def readArgs(): Unit = {
@@ -42,68 +39,4 @@ class ParameterTestCase {
     Assertions.assertEquals("5", 
param.get("flink.deployment.option.parallelism"))
   }
 
-  @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n\n\n" +
-      "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
-      "--c d\r\n" +
-      "--x yyy"
-    val programArgs = new ArrayBuffer[String]()
-    if (StringUtils.isNotEmpty(argsStr)) {
-      val multiLineChar = "\"\"\""
-      val array = argsStr.split("\\s+")
-      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
-        array.foreach(programArgs +=)
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        def processElement(index: Int, multiLine: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index)
-
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, true)
-              } else {
-                argsArray += elem
-                processElement(next, false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multiLine)
-              }
-            }
-          } else {
-            tempBuffer += elem
-            processElement(next, false)
-          }
-        }
-
-        processElement(0, false)
-        argsArray.foreach(x => programArgs += x.trim)
-      }
-    }
-
-    Assertions.assertEquals("localhost:8123", programArgs(1))
-    Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
-    Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("yyy", programArgs(7))
-  }
-
 }

Reply via email to