This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 598d36e52 [Improve] Shims base module code optimization (#3141)
598d36e52 is described below

commit 598d36e5259b42a6eb5d4ee8827873e0ffead745
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Sep 20 08:41:53 2023 +0800

    [Improve] Shims base module code optimization (#3141)
    
    * [Improve] Shims base module code optimization
    
    * Modify SqlClient
---
 .../org/apache/streampark/flink/core/FlinkSqlExecutor.scala |  5 +++--
 .../apache/streampark/flink/core/FlinkSqlValidator.scala    | 13 ++-----------
 .../org/apache/streampark/flink/core/SqlCommandParser.scala |  9 +++++----
 .../scala/org/apache/streampark/flink/cli/SqlClient.scala   |  6 ++++--
 4 files changed, 14 insertions(+), 19 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index ee1aa7a92..8d18a31e2 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -20,6 +20,7 @@ import 
org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.core.SqlCommand._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableEnvironment
@@ -39,8 +40,8 @@ object FlinkSqlExecutor extends Logger {
       parameter: ParameterTool,
       context: TableEnvironment)(implicit callbackFunc: String => Unit = 
null): Unit = {
     val flinkSql: String =
-      if (sql == null || sql.isEmpty) parameter.get(KEY_FLINK_SQL()) else 
parameter.get(sql)
-    require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink 
sql cannot be empty")
+      if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else 
parameter.get(sql)
+    require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot 
be empty")
 
     def callback(r: String): Unit = {
       callbackFunc match {
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 36cc11d02..26c31f928 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -23,10 +23,8 @@ import org.apache.streampark.flink.core.SqlCommand._
 import org.apache.calcite.config.Lex
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.parser.SqlParser.Config
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.configuration.ExecutionOptions
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance
-import org.apache.flink.table.api.{SqlDialect, TableConfig}
+import org.apache.flink.table.api.SqlDialect
 import org.apache.flink.table.api.SqlDialect.{DEFAULT, HIVE}
 import org.apache.flink.table.api.config.TableConfigOptions
 import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
@@ -45,13 +43,6 @@ object FlinkSqlValidator extends Logger {
 
   private[this] lazy val sqlParserConfigMap: Map[String, SqlParser.Config] = {
     def getConfig(sqlDialect: SqlDialect): Config = {
-      val tableConfig = new TableConfig()
-      tableConfig.getConfiguration.set(
-        ExecutionOptions.RUNTIME_MODE,
-        RuntimeExecutionMode.STREAMING)
-      tableConfig.getConfiguration.set(
-        TableConfigOptions.TABLE_SQL_DIALECT,
-        sqlDialect.name().toLowerCase())
       val conformance = sqlDialect match {
         case HIVE => FlinkSqlConformance.HIVE
         case DEFAULT => FlinkSqlConformance.DEFAULT
@@ -71,7 +62,7 @@ object FlinkSqlValidator extends Logger {
 
   def verifySql(sql: String): FlinkSqlValidationResult = {
     val sqlCommands = SqlCommandParser.parseSQL(sql, r => return r)
-    var sqlDialect = "default"
+    var sqlDialect = SqlDialect.DEFAULT.name().toLowerCase()
     var hasInsert = false
     for (call <- sqlCommands) {
       val args = call.operands.head
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 492e6e9cf..0d21aab5c 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -21,6 +21,7 @@ import 
org.apache.streampark.common.enums.FlinkSqlValidationFailedType
 import org.apache.streampark.common.util.Logger
 
 import enumeratum.EnumEntry
+import org.apache.commons.lang3.StringUtils
 
 import java.lang.{Boolean => JavaBool}
 import java.util.Scanner
@@ -37,7 +38,7 @@ object SqlCommandParser extends Logger {
       sql: String,
       validationCallback: FlinkSqlValidationResult => Unit = null): 
List[SqlCommandCall] = {
     val sqlEmptyError = "verify failed: flink sql cannot be empty."
-    require(sql != null && sql.trim.nonEmpty, sqlEmptyError)
+    require(StringUtils.isNotBlank(sql), sqlEmptyError)
     val sqlSegments = SqlSplitter.splitSql(sql)
     sqlSegments match {
       case s if s.isEmpty =>
@@ -121,7 +122,7 @@ sealed abstract class SqlCommand(
   var matcher: Matcher = _
 
   def matches(input: String): Boolean = {
-    if (regex == null) false
+    if (StringUtils.isBlank(regex)) false
     else {
       val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL)
       matcher = pattern.matcher(input)
@@ -433,7 +434,7 @@ object SqlSplitter {
    */
   def splitSql(sql: String): List[SqlSegment] = {
     val queries = ListBuffer[String]()
-    val lastIndex = if (sql != null && sql.nonEmpty) sql.length - 1 else 0
+    val lastIndex = if (StringUtils.isNotBlank(sql)) sql.length - 1 else 0
     var query = new mutable.StringBuilder
 
     var multiLineComment = false
@@ -454,7 +455,7 @@ object SqlSplitter {
       while (scanner.hasNextLine) {
         lineNumber += 1
         val line = scanner.nextLine().trim
-        val nonEmpty = line.nonEmpty && !line.startsWith(PARAM_PREFIX)
+        val nonEmpty = StringUtils.isNotBlank(line) && 
!line.startsWith(PARAM_PREFIX)
         if (line.startsWith("/*")) {
           startComment = true
           hasComment = true
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 76239c6cb..3b094d086 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.{DeflaterUtils, 
PropertiesUtils}
 import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
 import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
 
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.ExecutionOptions
 
@@ -34,7 +36,7 @@ object SqlClient extends App {
 
   private[this] val flinkSql = {
     val sql = parameterTool.get(KEY_FLINK_SQL())
-    require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be 
null")
+    require(StringUtils.isNotBlank(sql), "Usage: flink sql cannot be null")
     Try(DeflaterUtils.unzipString(sql)) match {
       case Success(value) => value
       case Failure(_) =>
@@ -44,7 +46,7 @@ object SqlClient extends App {
 
   private[this] val sets = 
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
 
-  private[this] val defaultMode = "streaming"
+  private[this] val defaultMode = 
RuntimeExecutionMode.STREAMING.name().toLowerCase()
 
   private[this] val mode = sets.find(_.operands.head == 
ExecutionOptions.RUNTIME_MODE.key()) match {
     case Some(e) =>

Reply via email to