This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new ad79b287a [Improve] delete|update syntax support
ad79b287a is described below

commit ad79b287a10bd721b0bdacfaa4ca2f6cc11b11b4
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 14:09:19 2023 +0800

    [Improve] delete|update syntax support
---
 .../streampark/flink/core/FlinkSqlExecutor.scala   | 14 +++++++++---
 .../streampark/flink/core/SqlCommandParser.scala   |  6 ++++++
 .../apache/streampark/flink/cli/SqlClient.scala    | 25 +++++++++++++++-------
 3 files changed, 34 insertions(+), 11 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index ee1aa7a92..51e498eb7 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -20,8 +20,9 @@ import 
org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.core.SqlCommand._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ExecutionOptions}
 import org.apache.flink.table.api.TableEnvironment
 
 import java.util
@@ -38,9 +39,10 @@ object FlinkSqlExecutor extends Logger {
       sql: String,
       parameter: ParameterTool,
       context: TableEnvironment)(implicit callbackFunc: String => Unit = 
null): Unit = {
+
     val flinkSql: String =
-      if (sql == null || sql.isEmpty) parameter.get(KEY_FLINK_SQL()) else 
parameter.get(sql)
-    require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink 
sql cannot be empty")
+      if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else 
parameter.get(sql)
+    require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot 
be empty")
 
     def callback(r: String): Unit = {
       callbackFunc match {
@@ -49,6 +51,8 @@ object FlinkSqlExecutor extends Logger {
       }
     }
 
+    val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())
+
     var hasInsert = false
     val statementSet = context.createStatementSet()
     SqlCommandParser
@@ -120,6 +124,10 @@ object FlinkSqlExecutor extends Logger {
             case SELECT =>
               logError("StreamPark dose not support 'SELECT' statement now!")
               throw new RuntimeException("StreamPark dose not support 'select' 
statement now!")
+            case DELETE | UPDATE =>
+              if (runMode == "STREAMING") {
+                throw new UnsupportedOperationException(s"$command unsupported 
streaming mode.")
+              }
             case _ =>
               try {
                 lock.lock()
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index e77bf5349..1f00d0fce 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -391,6 +391,12 @@ object SqlCommand extends enumeratum.Enum[SqlCommand] {
   case object END_STATEMENT_SET
     extends SqlCommand("end statement set", "END", Converters.NO_OPERANDS)
 
+  // Since: 2.1.2 for flink 1.18
+  case object DELETE extends SqlCommand("delete", "(DELETE\\s+FROM\\s+.+)")
+
+  // Since: 2.1.2 for flink 1.18
+  case object UPDATE extends SqlCommand("update", "(UPDATE\\s+.+)")
+
   private[this] def cleanUp(sql: String): String = 
sql.trim.replaceAll("^(['\"])|(['\"])$", "")
 
 }
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 76239c6cb..c4be0ab58 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -22,19 +22,24 @@ import org.apache.streampark.common.util.{DeflaterUtils, 
PropertiesUtils}
 import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
 import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
 
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.ExecutionOptions
 
+import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 import scala.util.{Failure, Success, Try}
 
 object SqlClient extends App {
 
+  val arguments = ArrayBuffer(args: _*)
+
   private[this] val parameterTool = ParameterTool.fromArgs(args)
 
   private[this] val flinkSql = {
     val sql = parameterTool.get(KEY_FLINK_SQL())
-    require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be 
null")
+    require(StringUtils.isNotBlank(sql), "Usage: flink sql cannot be null")
     Try(DeflaterUtils.unzipString(sql)) match {
       case Success(value) => value
       case Failure(_) =>
@@ -44,33 +49,37 @@ object SqlClient extends App {
 
   private[this] val sets = 
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
 
-  private[this] val defaultMode = "streaming"
+  private[this] val defaultMode = RuntimeExecutionMode.STREAMING.name()
 
   private[this] val mode = sets.find(_.operands.head == 
ExecutionOptions.RUNTIME_MODE.key()) match {
     case Some(e) =>
       // 1) flink sql execution.runtime-mode has highest priority
-      e.operands(1)
+      val m = e.operands(1).toUpperCase()
+      arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+      m
     case None =>
       // 2) dynamic properties execution.runtime-mode
       parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
         case null =>
-          parameterTool.get(KEY_APP_CONF(), null) match {
+          val m = parameterTool.get(KEY_APP_CONF(), null) match {
             case null => defaultMode
             case f =>
               val parameter = 
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
               // 3) application conf execution.runtime-mode
-              parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+              parameter.getOrElse(KEY_FLINK_TABLE_MODE, 
defaultMode).toUpperCase()
           }
+          arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+          m
         case m => m
       }
   }
 
   mode match {
-    case "batch" => BatchSqlApp.main(args)
-    case "streaming" => StreamSqlApp.main(args)
+    case "STREAMING" | "AUTOMATIC" => StreamSqlApp.main(arguments.toArray)
+    case "BATCH" => BatchSqlApp.main(arguments.toArray)
     case _ =>
       throw new IllegalArgumentException(
-        "Usage: runtime execution-mode invalid, optional [streaming|batch]")
+        "Usage: runtime execution-mode invalid, optional 
[STREAMING|BATCH|AUTOMATIC]")
   }
 
   private[this] object BatchSqlApp extends FlinkTable {

Reply via email to