This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch issue-2869 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 7c1c6cfa5740306913f99bdd03e665c083b56977 Author: benjobs <[email protected]> AuthorDate: Sat Aug 5 15:15:06 2023 +0800 issue-2869 bug fixed. --- .../common/util/PropertiesUtilsTestCase.scala | 70 +++++++++++++--------- .../flink/client/trait/FlinkClientTrait.scala | 62 +++++++++++-------- .../flink/client/test/ParameterTestCase.scala | 67 --------------------- 3 files changed, 78 insertions(+), 121 deletions(-) diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index e04e0e3be..acd2a431f 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -19,26 +19,33 @@ package org.apache.streampark.common.util import org.apache.commons.lang3.StringUtils import org.junit.jupiter.api.{Assertions, Test} +import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer +import scala.language.postfixOps class PropertiesUtilsTestCase { @Test def testExtractProgramArgs(): Unit = { - val argsStr = "--host localhost:8123\n\n\n" + - "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" + + val argsStr = "--host localhost:8123\n" + + "--sql \"insert into table_a select * from table_b\"\n" + "--c d\r\n" + - "--x yyy" + "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n" val programArgs = new ArrayBuffer[String]() if (StringUtils.isNotEmpty(argsStr)) { - val multiLineChar = "\"\"\"" + val multiChar = "\"" val array = argsStr.split("\\s+") - if (array.filter(_.startsWith(multiLineChar)).isEmpty) { - array.foreach(programArgs +=) + if (!array.exists(_.startsWith(multiChar))) { + array.foreach(x => { + if (x.trim.nonEmpty) { + programArgs += x + } + }) } else { val argsArray = new ArrayBuffer[String]() val tempBuffer = new ArrayBuffer[String]() - def processElement(index: Int, multiLine: Boolean): Unit = { + @tailrec + def processElement(index: Int, multi: Boolean): Unit = { if (index == array.length) { if (tempBuffer.nonEmpty) { @@ -48,43 +55,50 @@ class PropertiesUtilsTestCase { } val next = index + 1 - val elem = array(index) + val elem = array(index).trim - if (elem.trim.nonEmpty) { - if (!multiLine) { - if (elem.startsWith(multiLineChar)) { - tempBuffer += elem.drop(3) - processElement(next, true) - } else { - argsArray += elem - processElement(next, false) - } - } else { - if (elem.endsWith(multiLineChar)) { - tempBuffer += elem.dropRight(3) + if (elem.isEmpty) { + processElement(next, multi = false) + } else { + if (multi) { + if (elem.endsWith(multiChar)) { + tempBuffer += elem.dropRight(1) argsArray += tempBuffer.mkString(" ") tempBuffer.clear() - processElement(next, false) + processElement(next, multi = false) } else { tempBuffer += elem - processElement(next, multiLine) + processElement(next, multi) + } + } else { + if (elem.startsWith(multiChar)) { + if (elem.endsWith(multiChar)) { + tempBuffer += elem.drop(1).dropRight(1) + } else { + tempBuffer += elem.drop(1) + } + processElement(next, multi = true) + } else { + if (elem.endsWith(multiChar)) { + argsArray += elem.dropRight(1) + } else { + argsArray += elem + } + processElement(next, multi = false) } } - } else { - tempBuffer += elem - processElement(next, false) } } - processElement(0, false) - argsArray.foreach(x => programArgs += x.trim) + processElement(0, multi = false) + argsArray.foreach(x => programArgs += x) } } Assertions.assertEquals("localhost:8123", programArgs(1)) Assertions.assertEquals("insert into table_a select * from table_b", programArgs(3)) Assertions.assertEquals("d", programArgs(5)) - Assertions.assertEquals("yyy", programArgs(7)) + Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7)) } @Test def testDynamicProperties(): Unit = { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 8eec8118c..b1077cd45 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -405,56 +405,66 @@ trait FlinkClientTrait extends Logger { } private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = { - val programArgs = new ArrayBuffer[String]() + val args = submitRequest.args - if (StringUtils.isNotEmpty(submitRequest.args)) { - val multiLineChar = "\"\"\"" - val array = submitRequest.args.split("\\s+") - if (!array.exists(_.startsWith(multiLineChar))) { + if (StringUtils.isNotEmpty(args)) { + val multiChar = "\"" + val array = args.split("\\s+") + if (!array.exists(_.startsWith(multiChar))) { array.foreach(programArgs +=) } else { val argsArray = new ArrayBuffer[String]() val tempBuffer = new ArrayBuffer[String]() - @tailrec def processElement(index: Int, multiLine: Boolean): Unit = { + @tailrec + def processElement(index: Int, multi: Boolean): Unit = { + if (index == array.length) { if (tempBuffer.nonEmpty) { argsArray += tempBuffer.mkString(" ") } return } + val next = index + 1 - val elem = array(index) + val elem = array(index).trim - if (elem.trim.nonEmpty) { - if (!multiLine) { - if (elem.startsWith(multiLineChar)) { - tempBuffer += elem.drop(3) - processElement(next, multiLine = true) - } else { - argsArray += elem - processElement(next, multiLine = false) - } - } else { - if (elem.endsWith(multiLineChar)) { - tempBuffer += elem.dropRight(3) + if (elem.isEmpty) { + processElement(next, multi = false) + } else { + if (multi) { + if (elem.endsWith(multiChar)) { + tempBuffer += elem.dropRight(1) argsArray += tempBuffer.mkString(" ") tempBuffer.clear() - processElement(next, multiLine = false) + processElement(next, multi = false) } else { tempBuffer += elem - processElement(next, multiLine) + processElement(next, multi) + } + } else { + if (elem.startsWith(multiChar)) { + if (elem.endsWith(multiChar)) { + tempBuffer += elem.drop(1).dropRight(1) + } else { + tempBuffer += elem.drop(1) + } + processElement(next, multi = true) + } else { + if (elem.endsWith(multiChar)) { + argsArray += elem.dropRight(1) + } else { + argsArray += elem + } + processElement(next, multi = false) } } - } else { - tempBuffer += elem - processElement(next, multiLine = false) } } - processElement(0, multiLine = false) - argsArray.foreach(x => programArgs += x.trim) + processElement(0, multi = false) + argsArray.foreach(x => programArgs += x) } } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala index 32929d2d2..4827d824d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/ParameterTestCase.scala @@ -16,12 +16,9 @@ */ package org.apache.streampark.flink.client.test -import org.apache.commons.lang3.StringUtils import org.apache.flink.api.java.utils.ParameterTool import org.junit.jupiter.api.{Assertions, Test} -import scala.collection.mutable.ArrayBuffer - class ParameterTestCase { @Test def readArgs(): Unit = { @@ -42,68 +39,4 @@ class ParameterTestCase { Assertions.assertEquals("5", param.get("flink.deployment.option.parallelism")) } - @Test def testExtractProgramArgs(): Unit = { - val argsStr = "--host localhost:8123\n\n\n" + - "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" + - "--c d\r\n" + - "--x yyy" - val programArgs = new ArrayBuffer[String]() - if (StringUtils.isNotEmpty(argsStr)) { - val multiLineChar = "\"\"\"" - val array = argsStr.split("\\s+") - if (array.filter(_.startsWith(multiLineChar)).isEmpty) { - array.foreach(programArgs +=) - } else { - val argsArray = new ArrayBuffer[String]() - val tempBuffer = new ArrayBuffer[String]() - - def processElement(index: Int, multiLine: Boolean): Unit = { - - if (index == array.length) { - if (tempBuffer.nonEmpty) { - argsArray += tempBuffer.mkString(" ") - } - return - } - - val next = index + 1 - val elem = array(index) - - if (elem.trim.nonEmpty) { - if (!multiLine) { - if (elem.startsWith(multiLineChar)) { - tempBuffer += elem.drop(3) - processElement(next, true) - } else { - argsArray += elem - processElement(next, false) - } - } else { - if (elem.endsWith(multiLineChar)) { - tempBuffer += elem.dropRight(3) - argsArray += tempBuffer.mkString(" ") - tempBuffer.clear() - processElement(next, false) - } else { - tempBuffer += elem - processElement(next, multiLine) - } - } - } else { - tempBuffer += elem - processElement(next, false) - } - } - - processElement(0, false) - argsArray.foreach(x => programArgs += x.trim) - } - } - - Assertions.assertEquals("localhost:8123", programArgs(1)) - Assertions.assertEquals("insert into table_a select * from table_b", programArgs(3)) - Assertions.assertEquals("d", programArgs(5)) - Assertions.assertEquals("yyy", programArgs(7)) - } - }
