This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4dc5eb7ea [Bug] extract program arguments bug fixed. (#3297)
4dc5eb7ea is described below
commit 4dc5eb7eaf91382d0fcf7001c27cffb65bf0d95d
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 30 21:59:43 2023 +0800
[Bug] extract program arguments bug fixed. (#3297)
* [Improve] packageArgs improvement
* minor improvement
* [Improve] minor improve
* [Improve] method extractArguments improvement
* [Improve] method extractProgramArgs improvement
* [Improve] delete|update syntax support
* [Improve] delete|update syntax minor improvement
* [improve] jobGraph submit improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/conf/FlinkVersion.scala | 2 +
.../streampark/common/util/PropertiesUtils.scala | 26 +++-
.../common/util/PropertiesUtilsTestCase.scala | 89 +++--------
.../impl/KubernetesNativeSessionClient.scala | 2 +-
.../flink/client/impl/RemoteClient.scala | 3 +-
.../flink/client/impl/YarnApplicationClient.scala | 6 +-
.../flink/client/trait/FlinkClientTrait.scala | 166 +++++++++------------
.../streampark/flink/core/FlinkSqlExecutor.scala | 12 +-
.../streampark/flink/core/SqlCommandParser.scala | 6 +
.../apache/streampark/flink/cli/SqlClient.scala | 21 ++-
10 files changed, 153 insertions(+), 180 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index c05e28a3e..5455a6d5d 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -59,6 +59,8 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
lib
}
+ lazy val flinkLibs: List[NetURL] =
flinkLib.listFiles().map(_.toURI.toURL).toList
+
lazy val version: String = {
val flinkVersion = new AtomicReference[String]
val cmd = List(
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 706b3b090..b0db39578 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -30,7 +30,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
-import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
object PropertiesUtils extends Logger {
@@ -305,6 +305,30 @@ object PropertiesUtils extends Logger {
}
}
+ @Nonnull def extractArguments(args: String): List[String] = {
+ val programArgs = new ArrayBuffer[String]()
+ if (StringUtils.isNotEmpty(args)) {
+ val array = args.split("\\s+")
+ val iter = array.iterator
+ while (iter.hasNext) {
+ val v = iter.next()
+ val p = v.take(1)
+ p match {
+ case "'" | "\"" =>
+ var value = v
+ if (!v.endsWith(p)) {
+ while (!value.endsWith(p) && iter.hasNext) {
+ value += s" ${iter.next()}"
+ }
+ }
+ programArgs += value.substring(1, value.length - 1)
+ case _ => programArgs += v
+ }
+ }
+ }
+ programArgs.toList
+ }
+
@Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)
diff --git
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 01e1b2743..9715327f8 100644
---
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -16,83 +16,32 @@
*/
package org.apache.streampark.common.util
-import org.apache.commons.lang3.StringUtils
import org.junit.jupiter.api.{Assertions, Test}
-import scala.annotation.tailrec
-import scala.collection.mutable.ArrayBuffer
import scala.language.postfixOps
class PropertiesUtilsTestCase {
@Test def testExtractProgramArgs(): Unit = {
- val argsStr = "--host localhost:8123\n" +
- "--sql \"insert into table_a select * from table_b\"\n" +
- "--c d\r\n" +
- "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
- val programArgs = new ArrayBuffer[String]()
- if (StringUtils.isNotBlank(argsStr)) {
- val multiChar = "\""
- val array = argsStr.split("\\s+")
- if (!array.exists(_.startsWith(multiChar))) {
- array.foreach(
- x => {
- if (x.trim.nonEmpty) {
- programArgs += x
- }
- })
- } else {
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- @tailrec
- def processElement(index: Int, multi: Boolean): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index).trim
- val until = if (elem.endsWith(multiChar)) 1 else 0
-
- if (elem.isEmpty) {
- processElement(next, multi = false)
- } else {
- if (multi) {
- if (elem.endsWith(multiChar)) {
- tempBuffer += elem.dropRight(1)
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, multi = false)
- } else {
- tempBuffer += elem
- processElement(next, multi)
- }
- } else {
- if (elem.startsWith(multiChar)) {
- tempBuffer += elem.drop(1).dropRight(until)
- processElement(next, multi = true)
- } else {
- argsArray += elem.dropRight(until)
- processElement(next, multi = false)
- }
- }
- }
- }
-
- processElement(0, multi = false)
- argsArray.foreach(x => programArgs += x)
- }
- }
-
- Assertions.assertEquals("localhost:8123", programArgs(1))
- Assertions.assertEquals("insert into table_a select * from table_b",
programArgs(3))
- Assertions.assertEquals("d", programArgs(5))
- Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
+ val args =
+ "mysql-sync-database " +
+ "--database employees " +
+ "--mysql-conf hostname=127.0.0.1 " +
+ "--mysql-conf port=3306 " +
+ "--mysql-conf username=root " +
+ "--mysql-conf password=123456 " +
+ "--mysql-conf database-name=employees " +
+ "--including-tables 'test|test.*' " +
+ "--excluding-tables \"emp_*\" " +
+ "--query 'select * from employees where age > 20' " +
+ "--sink-conf fenodes=127.0.0.1:8030 " +
+ "--sink-conf username=root " +
+ "--sink-conf password= " +
+ "--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " +
+ "--sink-conf sink.label-prefix=label" +
+ "--table-conf replication_num=1"
+ val programArgs = PropertiesUtils.extractArguments(args)
+ println(programArgs)
}
@Test def testDynamicProperties(): Unit = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 8004fb91c..e2684014e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -57,7 +57,7 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null,
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
)
- super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
+ super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
}
/** Submit flink session job via rest api. */
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index fe4fe454a..6002306f1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -44,8 +44,7 @@ object RemoteClient extends FlinkClientTrait {
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
// submit job
- super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
-
+ super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
}
override def doCancel(
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f5862a082..f7b38cdde 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -102,10 +102,8 @@ object YarnApplicationClient extends YarnClientTrait {
throw new RuntimeException(s"$pyVenv File does not exist")
}
- val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib))
{
- flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
- }
+ // including $app/lib
+ includingPipelineJars(submitRequest, flinkConfig)
// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 75a05326f..5025a92b2 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,11 +18,11 @@
package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType,
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger,
SystemPropertyUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils,
FileUtils, Logger, PropertiesUtils, SystemPropertyUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -217,57 +217,79 @@ trait FlinkClientTrait extends Logger {
def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration):
CancelResponse
def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration)(
- restApiFunc: (SubmitRequest, Configuration) => SubmitResponse)(
- jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse):
SubmitResponse = {
- // Prioritize using Rest API submit while using JobGraph submit plan as
backup
+ jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse,
+ restApiFunc: (SubmitRequest, Configuration) => SubmitResponse):
SubmitResponse = {
+ // Prioritize using JobGraph submit plan while using Rest API submit plan
as backup
Try {
- logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
- restApiFunc(submitRequest, flinkConfig)
- }.getOrElse {
- logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit
Plan now.")
- Try(jobGraphFunc(submitRequest, flinkConfig)) match {
- case Success(r) => r
- case Failure(e) =>
- logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph
Submit Plan failed.")
- throw e
- }
+ logInfo(s"[flink-submit] Submit job with JobGraph Plan.")
+ jobGraphFunc(submitRequest, flinkConfig)
+ } match {
+ case Failure(e) =>
+ logWarn(
+ s"""\n
+ |[flink-submit] JobGraph Submit Plan failed, error detail:
+
|------------------------------------------------------------------
+ |${ExceptionUtils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Now retry submit with RestAPI Plan ...
+ |""".stripMargin
+ )
+ Try(restApiFunc(submitRequest, flinkConfig)) match {
+ case Success(r) => r
+ case Failure(e) =>
+ logError(
+ s"""\n
+ |[flink-submit] RestAPI Submit failed, error detail:
+
|------------------------------------------------------------------
+ |${ExceptionUtils.stringifyException(e)}
+
|------------------------------------------------------------------
+ |Both JobGraph submit plan and Rest API submit plan all
failed!
+ |""".stripMargin
+ )
+ throw e
+ }
+ case Success(v) => v
}
}
private[client] def getJobGraph(
submitRequest: SubmitRequest,
flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
- if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
- val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
- if (!FsOperator.lfs.exists(pythonVenv)) {
- throw new RuntimeException(s"$pythonVenv File does not exist")
- }
-
- val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
- if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib))
{
- flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
- }
- flinkConfig
- // python.archives
- .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
- // python.client.executable
- .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
- // python.executable
- .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
- }
-
- val packageProgram = PackagedProgram.newBuilder
+ val pkgBuilder = PackagedProgram.newBuilder
+ .setUserClassPaths(
+ Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
+ )
+ .setEntryPointClassName(
+
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+ )
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
- .orElse(Lists.newArrayList()): _*)
- .setEntryPointClassName(
-
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+ .orElse(Lists.newArrayList()): _*
)
.setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
- .build()
+ submitRequest.developmentMode match {
+ case FlinkDevelopmentMode.PYFLINK =>
+ val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
+ if (!FsOperator.lfs.exists(pythonVenv)) {
+ throw new RuntimeException(s"$pythonVenv File does not exist")
+ }
+ // including $app/lib
+ includingPipelineJars(submitRequest, flinkConfig)
+ flinkConfig
+ // python.archives
+ .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+ // python.client.executable
+ .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
+ // python.executable
+ .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+ case _ =>
+ pkgBuilder.setJarFile(submitRequest.userJarFile)
+ }
+
+ val packageProgram = pkgBuilder.build()
val jobGraph = PackagedProgramUtils.createJobGraph(
packageProgram,
flinkConfig,
@@ -433,60 +455,7 @@ trait FlinkClientTrait extends Logger {
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
val programArgs = new ArrayBuffer[String]()
- val args = submitRequest.args
-
- if (StringUtils.isNotBlank(args)) {
- val multiChar = "\""
- val array = args.split("\\s+")
- if (!array.exists(_.startsWith(multiChar))) {
- array.foreach(programArgs +=)
- } else {
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- @tailrec
- def processElement(index: Int, multi: Boolean): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index).trim
-
- if (elem.isEmpty) {
- processElement(next, multi = false)
- } else {
- if (multi) {
- if (elem.endsWith(multiChar)) {
- tempBuffer += elem.dropRight(1)
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, multi = false)
- } else {
- tempBuffer += elem
- processElement(next, multi)
- }
- } else {
- val until = if (elem.endsWith(multiChar)) 1 else 0
- if (elem.startsWith(multiChar)) {
- tempBuffer += elem.drop(1).dropRight(until)
- processElement(next, multi = true)
- } else {
- argsArray += elem.dropRight(until)
- processElement(next, multi = false)
- }
- }
- }
- }
-
- processElement(0, multi = false)
- argsArray.foreach(x => programArgs += x)
- }
- }
+ programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
@@ -522,7 +491,7 @@ trait FlinkClientTrait extends Logger {
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
}
- programArgs.toList.asJava
+ Lists.newArrayList(programArgs: _*)
}
private[this] def applyConfiguration(
@@ -626,4 +595,13 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath,
savepointRequest.nativeFormat).get()
}
+ private[client] def includingPipelineJars(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration) = {
+ val localLib: String =
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+ if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
+ flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+ }
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 9d6a09339..5d9af1ec4 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.flink.core.SqlCommand._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ExecutionOptions}
import org.apache.flink.table.api.TableEnvironment
import java.util
@@ -39,6 +39,7 @@ object FlinkSqlExecutor extends Logger {
sql: String,
parameter: ParameterTool,
context: TableEnvironment)(implicit callbackFunc: String => Unit =
null): Unit = {
+
val flinkSql: String =
if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else
parameter.get(sql)
require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot
be empty")
@@ -50,6 +51,8 @@ object FlinkSqlExecutor extends Logger {
}
}
+ val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())
+
var hasInsert = false
val statementSet = context.createStatementSet()
SqlCommandParser
@@ -121,6 +124,13 @@ object FlinkSqlExecutor extends Logger {
case SELECT =>
logError("StreamPark dose not support 'SELECT' statement now!")
throw new RuntimeException("StreamPark dose not support 'select'
statement now!")
+ case DELETE | UPDATE =>
+ if (runMode == "STREAMING") {
+ throw new UnsupportedOperationException(
+ s"Currently, ${command.toUpperCase()} statement only
supports in batch mode, " +
+ s"and it requires the target table connector implements
the SupportsRowLevelDelete, " +
+ s"For more details please refer to:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command")
+ }
case _ =>
try {
lock.lock()
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 157216ce8..12a2fdc47 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -393,6 +393,12 @@ object SqlCommand extends enumeratum.Enum[SqlCommand] {
case object END_STATEMENT_SET
extends SqlCommand("end statement set", "END", Converters.NO_OPERANDS)
+ // Since: 2.1.2 for flink 1.18
+ case object DELETE extends SqlCommand("delete", "(DELETE\\s+FROM\\s+.+)")
+
+ // Since: 2.1.2 for flink 1.18
+ case object UPDATE extends SqlCommand("update", "(UPDATE\\s+.+)")
+
private[this] def cleanUp(sql: String): String =
sql.trim.replaceAll("^(['\"])|(['\"])$", "")
}
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index e305f428b..d3ea70c9e 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -27,11 +27,14 @@ import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.ExecutionOptions
+import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}
object SqlClient extends App {
+ val arguments = ArrayBuffer(args: _*)
+
private[this] val parameterTool = ParameterTool.fromArgs(args)
private[this] val flinkSql = {
@@ -46,33 +49,37 @@ object SqlClient extends App {
private[this] val sets =
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
- private[this] val defaultMode =
RuntimeExecutionMode.STREAMING.name().toLowerCase()
+ private[this] val defaultMode = RuntimeExecutionMode.STREAMING.name()
private[this] val mode = sets.find(_.operands.head ==
ExecutionOptions.RUNTIME_MODE.key()) match {
case Some(e) =>
// 1) flink sql execution.runtime-mode has highest priority
- e.operands(1)
+ val m = e.operands(1).toUpperCase()
+ arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+ m
case None =>
// 2) dynamic properties execution.runtime-mode
parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
case null =>
- parameterTool.get(KEY_APP_CONF(), null) match {
+ val m = parameterTool.get(KEY_APP_CONF(), null) match {
case null => defaultMode
case f =>
val parameter =
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
// 3) application conf execution.runtime-mode
- parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ parameter.getOrElse(KEY_FLINK_TABLE_MODE,
defaultMode).toUpperCase()
}
+ arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+ m
case m => m
}
}
mode match {
- case "batch" => BatchSqlApp.main(args)
- case "streaming" => StreamSqlApp.main(args)
+ case "STREAMING" | "AUTOMATIC" => StreamSqlApp.main(arguments.toArray)
+ case "BATCH" => BatchSqlApp.main(arguments.toArray)
case _ =>
throw new IllegalArgumentException(
- "Usage: runtime execution-mode invalid, optional [streaming|batch]")
+ "Usage: runtime execution-mode invalid, optional
[STREAMING|BATCH|AUTOMATIC]")
}
private[this] object BatchSqlApp extends FlinkTable {