This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4dc5eb7ea [Bug] extract program arguments bug fixed. (#3297)
4dc5eb7ea is described below

commit 4dc5eb7eaf91382d0fcf7001c27cffb65bf0d95d
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 30 21:59:43 2023 +0800

    [Bug] extract program arguments bug fixed. (#3297)
    
    * [Improve] packageArgs improvement
    
    * minor improvement
    
    * [Improve] minor improve
    
    * [Improve] method extractArguments improvement
    
    * [Improve] method extractProgramArgs improvement
    
    * [Improve] delete|update syntax support
    
    * [Improve] delete|update syntax minor improvement
    
    * [improve] jobGraph submit improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/conf/FlinkVersion.scala      |   2 +
 .../streampark/common/util/PropertiesUtils.scala   |  26 +++-
 .../common/util/PropertiesUtilsTestCase.scala      |  89 +++--------
 .../impl/KubernetesNativeSessionClient.scala       |   2 +-
 .../flink/client/impl/RemoteClient.scala           |   3 +-
 .../flink/client/impl/YarnApplicationClient.scala  |   6 +-
 .../flink/client/trait/FlinkClientTrait.scala      | 166 +++++++++------------
 .../streampark/flink/core/FlinkSqlExecutor.scala   |  12 +-
 .../streampark/flink/core/SqlCommandParser.scala   |   6 +
 .../apache/streampark/flink/cli/SqlClient.scala    |  21 ++-
 10 files changed, 153 insertions(+), 180 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index c05e28a3e..5455a6d5d 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -59,6 +59,8 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
     lib
   }
 
+  lazy val flinkLibs: List[NetURL] = 
flinkLib.listFiles().map(_.toURI.toURL).toList
+
   lazy val version: String = {
     val flinkVersion = new AtomicReference[String]
     val cmd = List(
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 706b3b090..b0db39578 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -30,7 +30,7 @@ import java.util.regex.Pattern
 import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions._
 import scala.collection.mutable
-import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
 
 object PropertiesUtils extends Logger {
 
@@ -305,6 +305,30 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  @Nonnull def extractArguments(args: String): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    if (StringUtils.isNotEmpty(args)) {
+      val array = args.split("\\s+")
+      val iter = array.iterator
+      while (iter.hasNext) {
+        val v = iter.next()
+        val p = v.take(1)
+        p match {
+          case "'" | "\"" =>
+            var value = v
+            if (!v.endsWith(p)) {
+              while (!value.endsWith(p) && iter.hasNext) {
+                value += s" ${iter.next()}"
+              }
+            }
+            programArgs += value.substring(1, value.length - 1)
+          case _ => programArgs += v
+        }
+      }
+    }
+    programArgs.toList
+  }
+
   @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
     new JavaMap[String, String](extractDynamicProperties(properties).asJava)
 
diff --git 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
index 01e1b2743..9715327f8 100644
--- 
a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
+++ 
b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala
@@ -16,83 +16,32 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.commons.lang3.StringUtils
 import org.junit.jupiter.api.{Assertions, Test}
 
-import scala.annotation.tailrec
-import scala.collection.mutable.ArrayBuffer
 import scala.language.postfixOps
 
 class PropertiesUtilsTestCase {
 
   @Test def testExtractProgramArgs(): Unit = {
-    val argsStr = "--host localhost:8123\n" +
-      "--sql \"insert into table_a select * from table_b\"\n" +
-      "--c d\r\n" +
-      "--including-tables \"BASE_CARD_ETPS|BASE_CHECKED_STAT\"\n"
-    val programArgs = new ArrayBuffer[String]()
-    if (StringUtils.isNotBlank(argsStr)) {
-      val multiChar = "\""
-      val array = argsStr.split("\\s+")
-      if (!array.exists(_.startsWith(multiChar))) {
-        array.foreach(
-          x => {
-            if (x.trim.nonEmpty) {
-              programArgs += x
-            }
-          })
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        @tailrec
-        def processElement(index: Int, multi: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index).trim
-          val until = if (elem.endsWith(multiChar)) 1 else 0
-
-          if (elem.isEmpty) {
-            processElement(next, multi = false)
-          } else {
-            if (multi) {
-              if (elem.endsWith(multiChar)) {
-                tempBuffer += elem.dropRight(1)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, multi = false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multi)
-              }
-            } else {
-              if (elem.startsWith(multiChar)) {
-                tempBuffer += elem.drop(1).dropRight(until)
-                processElement(next, multi = true)
-              } else {
-                argsArray += elem.dropRight(until)
-                processElement(next, multi = false)
-              }
-            }
-          }
-        }
-
-        processElement(0, multi = false)
-        argsArray.foreach(x => programArgs += x)
-      }
-    }
-
-    Assertions.assertEquals("localhost:8123", programArgs(1))
-    Assertions.assertEquals("insert into table_a select * from table_b", 
programArgs(3))
-    Assertions.assertEquals("d", programArgs(5))
-    Assertions.assertEquals("BASE_CARD_ETPS|BASE_CHECKED_STAT", programArgs(7))
+    val args =
+      "mysql-sync-database " +
+        "--database employees " +
+        "--mysql-conf hostname=127.0.0.1 " +
+        "--mysql-conf port=3306 " +
+        "--mysql-conf username=root " +
+        "--mysql-conf password=123456 " +
+        "--mysql-conf database-name=employees " +
+        "--including-tables 'test|test.*' " +
+        "--excluding-tables \"emp_*\" " +
+        "--query 'select * from employees where age > 20' " +
+        "--sink-conf fenodes=127.0.0.1:8030 " +
+        "--sink-conf username=root " +
+        "--sink-conf password= " +
+        "--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 " +
+        "--sink-conf sink.label-prefix=label" +
+        "--table-conf replication_num=1"
+    val programArgs = PropertiesUtils.extractArguments(args)
+    println(programArgs)
   }
 
   @Test def testDynamicProperties(): Unit = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 8004fb91c..e2684014e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -57,7 +57,7 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
       StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
       s"[flink-submit] submit flink job failed, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
     )
-    super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
+    super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
   }
 
   /** Submit flink session job via rest api. */
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index fe4fe454a..6002306f1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -44,8 +44,7 @@ object RemoteClient extends FlinkClientTrait {
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
     //  submit job
-    super.trySubmit(submitRequest, flinkConfig)(restApiSubmit)(jobGraphSubmit)
-
+    super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
   }
 
   override def doCancel(
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f5862a082..f7b38cdde 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -102,10 +102,8 @@ object YarnApplicationClient extends YarnClientTrait {
         throw new RuntimeException(s"$pyVenv File does not exist")
       }
 
-      val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-      if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) 
{
-        flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
-      }
+      // including $app/lib
+      includingPipelineJars(submitRequest, flinkConfig)
 
       // yarn.ship-files
       val shipFiles = new util.ArrayList[String]()
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 75a05326f..5025a92b2 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,11 +18,11 @@
 package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
 import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.{ApplicationType, 
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, 
SystemPropertyUtils}
+import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils, 
FileUtils, Logger, PropertiesUtils, SystemPropertyUtils, Utils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -217,57 +217,79 @@ trait FlinkClientTrait extends Logger {
   def doCancel(cancelRequest: CancelRequest, flinkConf: Configuration): 
CancelResponse
 
   def trySubmit(submitRequest: SubmitRequest, flinkConfig: Configuration)(
-      restApiFunc: (SubmitRequest, Configuration) => SubmitResponse)(
-      jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse): 
SubmitResponse = {
-    // Prioritize using Rest API submit while using JobGraph submit plan as 
backup
+      jobGraphFunc: (SubmitRequest, Configuration) => SubmitResponse,
+      restApiFunc: (SubmitRequest, Configuration) => SubmitResponse): 
SubmitResponse = {
+    // Prioritize using JobGraph submit plan while using Rest API submit plan 
as backup
     Try {
-      logInfo(s"[flink-submit] Attempting to submit in Rest API Submit Plan.")
-      restApiFunc(submitRequest, flinkConfig)
-    }.getOrElse {
-      logWarn(s"[flink-submit] RestAPI Submit Plan failed,try JobGraph Submit 
Plan now.")
-      Try(jobGraphFunc(submitRequest, flinkConfig)) match {
-        case Success(r) => r
-        case Failure(e) =>
-          logError(s"[flink-submit] Both Rest API Submit Plan and JobGraph 
Submit Plan failed.")
-          throw e
-      }
+      logInfo(s"[flink-submit] Submit job with JobGraph Plan.")
+      jobGraphFunc(submitRequest, flinkConfig)
+    } match {
+      case Failure(e) =>
+        logWarn(
+          s"""\n
+             |[flink-submit] JobGraph Submit Plan failed, error detail:
+             
|------------------------------------------------------------------
+             |${ExceptionUtils.stringifyException(e)}
+             
|------------------------------------------------------------------
+             |Now retry submit with RestAPI Plan ...
+             |""".stripMargin
+        )
+        Try(restApiFunc(submitRequest, flinkConfig)) match {
+          case Success(r) => r
+          case Failure(e) =>
+            logError(
+              s"""\n
+                 |[flink-submit] RestAPI Submit failed, error detail:
+                 
|------------------------------------------------------------------
+                 |${ExceptionUtils.stringifyException(e)}
+                 
|------------------------------------------------------------------
+                 |Both JobGraph submit plan and Rest API submit plan all 
failed!
+                 |""".stripMargin
+            )
+            throw e
+        }
+      case Success(v) => v
     }
   }
 
   private[client] def getJobGraph(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): (PackagedProgram, JobGraph) = {
-    if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
-      val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
-      if (!FsOperator.lfs.exists(pythonVenv)) {
-        throw new RuntimeException(s"$pythonVenv File does not exist")
-      }
-
-      val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
-      if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) 
{
-        flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
-      }
 
-      flinkConfig
-        // python.archives
-        .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
-        // python.client.executable
-        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
-        // python.executable
-        .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
-    }
-
-    val packageProgram = PackagedProgram.newBuilder
+    val pkgBuilder = PackagedProgram.newBuilder
+      .setUserClassPaths(
+        Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*)
+      )
+      .setEntryPointClassName(
+        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+      )
       .setArguments(
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)
-          .orElse(Lists.newArrayList()): _*)
-      .setEntryPointClassName(
-        
flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()
+          .orElse(Lists.newArrayList()): _*
       )
       .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
-      .build()
 
+    submitRequest.developmentMode match {
+      case FlinkDevelopmentMode.PYFLINK =>
+        val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
+        if (!FsOperator.lfs.exists(pythonVenv)) {
+          throw new RuntimeException(s"$pythonVenv File does not exist")
+        }
+        // including $app/lib
+        includingPipelineJars(submitRequest, flinkConfig)
+        flinkConfig
+          // python.archives
+          .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
+          // python.client.executable
+          .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
+          // python.executable
+          .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
+      case _ =>
+        pkgBuilder.setJarFile(submitRequest.userJarFile)
+    }
+
+    val packageProgram = pkgBuilder.build()
     val jobGraph = PackagedProgramUtils.createJobGraph(
       packageProgram,
       flinkConfig,
@@ -433,60 +455,7 @@ trait FlinkClientTrait extends Logger {
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
     val programArgs = new ArrayBuffer[String]()
-    val args = submitRequest.args
-
-    if (StringUtils.isNotBlank(args)) {
-      val multiChar = "\""
-      val array = args.split("\\s+")
-      if (!array.exists(_.startsWith(multiChar))) {
-        array.foreach(programArgs +=)
-      } else {
-        val argsArray = new ArrayBuffer[String]()
-        val tempBuffer = new ArrayBuffer[String]()
-
-        @tailrec
-        def processElement(index: Int, multi: Boolean): Unit = {
-
-          if (index == array.length) {
-            if (tempBuffer.nonEmpty) {
-              argsArray += tempBuffer.mkString(" ")
-            }
-            return
-          }
-
-          val next = index + 1
-          val elem = array(index).trim
-
-          if (elem.isEmpty) {
-            processElement(next, multi = false)
-          } else {
-            if (multi) {
-              if (elem.endsWith(multiChar)) {
-                tempBuffer += elem.dropRight(1)
-                argsArray += tempBuffer.mkString(" ")
-                tempBuffer.clear()
-                processElement(next, multi = false)
-              } else {
-                tempBuffer += elem
-                processElement(next, multi)
-              }
-            } else {
-              val until = if (elem.endsWith(multiChar)) 1 else 0
-              if (elem.startsWith(multiChar)) {
-                tempBuffer += elem.drop(1).dropRight(until)
-                processElement(next, multi = true)
-              } else {
-                argsArray += elem.dropRight(until)
-                processElement(next, multi = false)
-              }
-            }
-          }
-        }
-
-        processElement(0, multi = false)
-        argsArray.foreach(x => programArgs += x)
-      }
-    }
+    programArgs ++= PropertiesUtils.extractArguments(submitRequest.args)
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
 
@@ -522,7 +491,7 @@ trait FlinkClientTrait extends Logger {
       programArgs.add("-py")
       programArgs.add(submitRequest.userJarFile.getAbsolutePath)
     }
-    programArgs.toList.asJava
+    Lists.newArrayList(programArgs: _*)
   }
 
   private[this] def applyConfiguration(
@@ -626,4 +595,13 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath, 
savepointRequest.nativeFormat).get()
   }
 
+  private[client] def includingPipelineJars(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration) = {
+    val localLib: String = 
s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
+    if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
+      flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
+    }
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 9d6a09339..5d9af1ec4 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -22,7 +22,7 @@ import org.apache.streampark.flink.core.SqlCommand._
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ExecutionOptions}
 import org.apache.flink.table.api.TableEnvironment
 
 import java.util
@@ -39,6 +39,7 @@ object FlinkSqlExecutor extends Logger {
       sql: String,
       parameter: ParameterTool,
       context: TableEnvironment)(implicit callbackFunc: String => Unit = 
null): Unit = {
+
     val flinkSql: String =
       if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else 
parameter.get(sql)
     require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot 
be empty")
@@ -50,6 +51,8 @@ object FlinkSqlExecutor extends Logger {
       }
     }
 
+    val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())
+
     var hasInsert = false
     val statementSet = context.createStatementSet()
     SqlCommandParser
@@ -121,6 +124,13 @@ object FlinkSqlExecutor extends Logger {
             case SELECT =>
               logError("StreamPark dose not support 'SELECT' statement now!")
               throw new RuntimeException("StreamPark dose not support 'select' 
statement now!")
+            case DELETE | UPDATE =>
+              if (runMode == "STREAMING") {
+                throw new UnsupportedOperationException(
+                  s"Currently, ${command.toUpperCase()} statement only 
supports in batch mode, " +
+                    s"and it requires the target table connector implements 
the SupportsRowLevelDelete, " +
+                    s"For more details please refer to: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command";)
+              }
             case _ =>
               try {
                 lock.lock()
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 157216ce8..12a2fdc47 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -393,6 +393,12 @@ object SqlCommand extends enumeratum.Enum[SqlCommand] {
   case object END_STATEMENT_SET
     extends SqlCommand("end statement set", "END", Converters.NO_OPERANDS)
 
+  // Since: 2.1.2 for flink 1.18
+  case object DELETE extends SqlCommand("delete", "(DELETE\\s+FROM\\s+.+)")
+
+  // Since: 2.1.2 for flink 1.18
+  case object UPDATE extends SqlCommand("update", "(UPDATE\\s+.+)")
+
   private[this] def cleanUp(sql: String): String = 
sql.trim.replaceAll("^(['\"])|(['\"])$", "")
 
 }
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index e305f428b..d3ea70c9e 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -27,11 +27,14 @@ import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.ExecutionOptions
 
+import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 import scala.util.{Failure, Success, Try}
 
 object SqlClient extends App {
 
+  val arguments = ArrayBuffer(args: _*)
+
   private[this] val parameterTool = ParameterTool.fromArgs(args)
 
   private[this] val flinkSql = {
@@ -46,33 +49,37 @@ object SqlClient extends App {
 
   private[this] val sets = 
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
 
-  private[this] val defaultMode = 
RuntimeExecutionMode.STREAMING.name().toLowerCase()
+  private[this] val defaultMode = RuntimeExecutionMode.STREAMING.name()
 
   private[this] val mode = sets.find(_.operands.head == 
ExecutionOptions.RUNTIME_MODE.key()) match {
     case Some(e) =>
       // 1) flink sql execution.runtime-mode has highest priority
-      e.operands(1)
+      val m = e.operands(1).toUpperCase()
+      arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+      m
     case None =>
       // 2) dynamic properties execution.runtime-mode
       parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
         case null =>
-          parameterTool.get(KEY_APP_CONF(), null) match {
+          val m = parameterTool.get(KEY_APP_CONF(), null) match {
             case null => defaultMode
             case f =>
               val parameter = 
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
               // 3) application conf execution.runtime-mode
-              parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+              parameter.getOrElse(KEY_FLINK_TABLE_MODE, 
defaultMode).toUpperCase()
           }
+          arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+          m
         case m => m
       }
   }
 
   mode match {
-    case "batch" => BatchSqlApp.main(args)
-    case "streaming" => StreamSqlApp.main(args)
+    case "STREAMING" | "AUTOMATIC" => StreamSqlApp.main(arguments.toArray)
+    case "BATCH" => BatchSqlApp.main(arguments.toArray)
     case _ =>
       throw new IllegalArgumentException(
-        "Usage: runtime execution-mode invalid, optional [streaming|batch]")
+        "Usage: runtime execution-mode invalid, optional 
[STREAMING|BATCH|AUTOMATIC]")
   }
 
   private[this] object BatchSqlApp extends FlinkTable {

Reply via email to