This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new ad79b287a [Improve] delete|update syntax support
ad79b287a is described below
commit ad79b287a10bd721b0bdacfaa4ca2f6cc11b11b4
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 29 14:09:19 2023 +0800
[Improve] delete|update syntax support
---
.../streampark/flink/core/FlinkSqlExecutor.scala | 14 +++++++++---
.../streampark/flink/core/SqlCommandParser.scala | 6 ++++++
.../apache/streampark/flink/cli/SqlClient.scala | 25 +++++++++++++++-------
3 files changed, 34 insertions(+), 11 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index ee1aa7a92..51e498eb7 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -20,8 +20,9 @@ import
org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.core.SqlCommand._
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, ExecutionOptions}
import org.apache.flink.table.api.TableEnvironment
import java.util
@@ -38,9 +39,10 @@ object FlinkSqlExecutor extends Logger {
sql: String,
parameter: ParameterTool,
context: TableEnvironment)(implicit callbackFunc: String => Unit =
null): Unit = {
+
val flinkSql: String =
- if (sql == null || sql.isEmpty) parameter.get(KEY_FLINK_SQL()) else
parameter.get(sql)
- require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink
sql cannot be empty")
+ if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else
parameter.get(sql)
+ require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot
be empty")
def callback(r: String): Unit = {
callbackFunc match {
@@ -49,6 +51,8 @@ object FlinkSqlExecutor extends Logger {
}
}
+ val runMode = parameter.get(ExecutionOptions.RUNTIME_MODE.key())
+
var hasInsert = false
val statementSet = context.createStatementSet()
SqlCommandParser
@@ -120,6 +124,10 @@ object FlinkSqlExecutor extends Logger {
case SELECT =>
logError("StreamPark dose not support 'SELECT' statement now!")
throw new RuntimeException("StreamPark dose not support 'select'
statement now!")
+ case DELETE | UPDATE =>
+ if (runMode == "STREAMING") {
+ throw new UnsupportedOperationException(s"$command unsupported
streaming mode.")
+ }
case _ =>
try {
lock.lock()
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index e77bf5349..1f00d0fce 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -391,6 +391,12 @@ object SqlCommand extends enumeratum.Enum[SqlCommand] {
case object END_STATEMENT_SET
extends SqlCommand("end statement set", "END", Converters.NO_OPERANDS)
+ // Since: 2.1.2 for flink 1.18
+ case object DELETE extends SqlCommand("delete", "(DELETE\\s+FROM\\s+.+)")
+
+ // Since: 2.1.2 for flink 1.18
+ case object UPDATE extends SqlCommand("update", "(UPDATE\\s+.+)")
+
private[this] def cleanUp(sql: String): String =
sql.trim.replaceAll("^(['\"])|(['\"])$", "")
}
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 76239c6cb..c4be0ab58 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -22,19 +22,24 @@ import org.apache.streampark.common.util.{DeflaterUtils,
PropertiesUtils}
import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.ExecutionOptions
+import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}
object SqlClient extends App {
+ val arguments = ArrayBuffer(args: _*)
+
private[this] val parameterTool = ParameterTool.fromArgs(args)
private[this] val flinkSql = {
val sql = parameterTool.get(KEY_FLINK_SQL())
- require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be
null")
+ require(StringUtils.isNotBlank(sql), "Usage: flink sql cannot be null")
Try(DeflaterUtils.unzipString(sql)) match {
case Success(value) => value
case Failure(_) =>
@@ -44,33 +49,37 @@ object SqlClient extends App {
private[this] val sets =
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
- private[this] val defaultMode = "streaming"
+ private[this] val defaultMode = RuntimeExecutionMode.STREAMING.name()
private[this] val mode = sets.find(_.operands.head ==
ExecutionOptions.RUNTIME_MODE.key()) match {
case Some(e) =>
// 1) flink sql execution.runtime-mode has highest priority
- e.operands(1)
+ val m = e.operands(1).toUpperCase()
+ arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+ m
case None =>
// 2) dynamic properties execution.runtime-mode
parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
case null =>
- parameterTool.get(KEY_APP_CONF(), null) match {
+ val m = parameterTool.get(KEY_APP_CONF(), null) match {
case null => defaultMode
case f =>
val parameter =
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
// 3) application conf execution.runtime-mode
- parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ parameter.getOrElse(KEY_FLINK_TABLE_MODE,
defaultMode).toUpperCase()
}
+ arguments += s"-D${ExecutionOptions.RUNTIME_MODE.key()}=$m"
+ m
case m => m
}
}
mode match {
- case "batch" => BatchSqlApp.main(args)
- case "streaming" => StreamSqlApp.main(args)
+ case "STREAMING" | "AUTOMATIC" => StreamSqlApp.main(arguments.toArray)
+ case "BATCH" => BatchSqlApp.main(arguments.toArray)
case _ =>
throw new IllegalArgumentException(
- "Usage: runtime execution-mode invalid, optional [streaming|batch]")
+ "Usage: runtime execution-mode invalid, optional
[STREAMING|BATCH|AUTOMATIC]")
}
private[this] object BatchSqlApp extends FlinkTable {