This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch args in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 778628175bac71956be9d075bb2df91eac2cacd0 Author: benjobs <[email protected]> AuthorDate: Sun Oct 29 00:47:16 2023 +0800 [Improve] packageArgs improvement --- .../streampark/common/conf/FlinkVersion.scala | 2 + .../streampark/common/util/PropertiesUtils.scala | 26 ++++++- .../common/util/PropertiesUtilsTestCase.scala | 89 +++++----------------- .../flink/client/trait/FlinkClientTrait.scala | 48 +++++++----- 4 files changed, 73 insertions(+), 92 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala index c05e28a3e..5455a6d5d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala @@ -59,6 +59,8 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg lib } + lazy val flinkLibs: List[NetURL] = flinkLib.listFiles().map(_.toURI.toURL).toList + lazy val version: String = { val flinkVersion = new AtomicReference[String] val cmd = List( diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 706b3b090..11d0d7c0b 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -30,7 +30,7 @@ import java.util.regex.Pattern import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable -import scala.collection.mutable.{Map => MutableMap} +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} object PropertiesUtils extends Logger { @@ -305,6 +305,30 @@ object PropertiesUtils extends Logger { } } + @Nonnull def extractArguments(args: String): List[String] = { + val programArgs = new ArrayBuffer[String]() + if (StringUtils.isNotEmpty(args)) { + val array = args.split("\\s+") + val iter = array.iterator + while (iter.hasNext) { + val v = iter.next() + val p = v.take(1) + p match { + case "'" | "\"" => + var value = v + if (!v.endsWith(p)) { + while (!value.endsWith(p) && iter.hasNext) { + value += s" ${iter.next()}" + } + } + programArgs += value.replaceAll(s"^$p|$p$$", "") + case _ => programArgs += v + } + } + } + programArgs.toList + } + @Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] = new JavaMap[String, String](extractDynamicProperties(properties).asJava) diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index 01e1b2743..9715327f8 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -16,83 +16,32 @@ */ package org.apache.streampark.common.util -import org.apache.commons.lang3.StringUtils import org.junit.jupiter.api.{Assertions, Test} -import scala.annotation.tailrec -import scala.collection.mutable.ArrayBuffer import scala.language.postfixOps class PropertiesUtilsTestCase { @Test def testExtractProgramArgs(): Unit = { - val argsStr = "--host localhost:8123\n" + - "--sql \"insert into table_a select * from table_b\"\n" + - "--c d\r\n" + - "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n" - val programArgs = new ArrayBuffer[String]() - if (StringUtils.isNotBlank(argsStr)) { - val multiChar = "\"" - val array = argsStr.split("\\s+") - if (!array.exists(_.startsWith(multiChar))) { - array.foreach( - x => { - if (x.trim.nonEmpty) { - programArgs += x - } - }) - } else { - val argsArray = new ArrayBuffer[String]() - val tempBuffer = new ArrayBuffer[String]() - - @tailrec - def processElement(index: Int, multi: Boolean): Unit = { - - if (index == array.length) { - if (tempBuffer.nonEmpty) { - argsArray += tempBuffer.mkString(" ") - } - return - } - - val next = index + 1 - val elem = array(index).trim - val until = if (elem.endsWith(multiChar)) 1 else 0 - - if (elem.isEmpty) { - processElement(next, multi = false) - } else { - if (multi) { - if (elem.endsWith(multiChar)) { - tempBuffer += elem.dropRight(1) - argsArray += tempBuffer.mkString(" ") - tempBuffer.clear() - processElement(next, multi = false) - } else { - tempBuffer += elem - processElement(next, multi) - } - } else { - if (elem.startsWith(multiChar)) { - tempBuffer += elem.drop(1).dropRight(until) - processElement(next, multi = true) - } else { - argsArray += elem.dropRight(until) - processElement(next, multi = false) - } - } - } - } - - processElement(0, multi = false) - argsArray.foreach(x => programArgs += x) - } - } - - Assertions.assertEquals("localhost:8123", programArgs(1)) - Assertions.assertEquals("insert into table_a select * from table_b", programArgs(3)) - Assertions.assertEquals("d", programArgs(5)) - Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7)) + val args = + "mysql-sync-database " + + "--database employees " + + "--mysql-conf hostname=127.0.0.1 " + + "--mysql-conf port=3306 " + + "--mysql-conf username=root " + + "--mysql-conf password=123456 " + + "--mysql-conf database-name=employees " + + "--including-tables 'test|test.*' " + + "--excluding-tables \"emp_*\" " + + "--query 'select * from employees where age > 20' " + + "--sink-conf fenodes=127.0.0.1:8030 " + + "--sink-conf username=root " + + "--sink-conf password= " + + "--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " + + "--sink-conf sink.label-prefix=label" + + "--table-conf replication_num=1" + val programArgs = PropertiesUtils.extractArguments(args) + println(programArgs) } @Test def testDynamicProperties(): Unit = { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 75a05326f..271d3d0df 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -237,27 +237,11 @@ trait FlinkClientTrait extends Logger { private[client] def getJobGraph( submitRequest: SubmitRequest, flinkConfig: Configuration): (PackagedProgram, JobGraph) = { - if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) { - val pythonVenv: String = Workspace.local.APP_PYTHON_VENV - if (!FsOperator.lfs.exists(pythonVenv)) { - throw new RuntimeException(s"$pythonVenv File does not exist") - } - - val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib" - if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) { - flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib)) - } - flinkConfig - // python.archives - .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) - // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) - // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) - } - - val packageProgram = PackagedProgram.newBuilder + val pkgBuilder = PackagedProgram.newBuilder + .setUserClassPaths( + Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*) + ) .setArguments( flinkConfig .getOptional(ApplicationConfiguration.APPLICATION_ARGS) @@ -266,8 +250,30 @@ trait FlinkClientTrait extends Logger { flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get() ) .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .build() + submitRequest.developmentMode match { + case FlinkDevelopmentMode.PYFLINK => + val pythonVenv: String = Workspace.local.APP_PYTHON_VENV + if (!FsOperator.lfs.exists(pythonVenv)) { + throw new RuntimeException(s"$pythonVenv File does not exist") + } + + val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib" + if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) { + flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib)) + } + + flinkConfig + // python.archives + .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) + // python.client.executable + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) + // python.executable + .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) + case _ => + } + + val packageProgram = pkgBuilder.build() val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, flinkConfig,
