This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 598d36e52 [Improve] Shims base module code optimization (#3141)
598d36e52 is described below
commit 598d36e5259b42a6eb5d4ee8827873e0ffead745
Author: ChengJie1053 <[email protected]>
AuthorDate: Wed Sep 20 08:41:53 2023 +0800
[Improve] Shims base module code optimization (#3141)
* [Improve] Shims base module code optimization
* Modify SqlClient
---
.../org/apache/streampark/flink/core/FlinkSqlExecutor.scala | 5 +++--
.../apache/streampark/flink/core/FlinkSqlValidator.scala | 13 ++-----------
.../org/apache/streampark/flink/core/SqlCommandParser.scala | 9 +++++----
.../scala/org/apache/streampark/flink/cli/SqlClient.scala | 6 ++++--
4 files changed, 14 insertions(+), 19 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index ee1aa7a92..8d18a31e2 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -20,6 +20,7 @@ import
org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.core.SqlCommand._
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.TableEnvironment
@@ -39,8 +40,8 @@ object FlinkSqlExecutor extends Logger {
parameter: ParameterTool,
context: TableEnvironment)(implicit callbackFunc: String => Unit =
null): Unit = {
val flinkSql: String =
- if (sql == null || sql.isEmpty) parameter.get(KEY_FLINK_SQL()) else
parameter.get(sql)
- require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink
sql cannot be empty")
+ if (StringUtils.isBlank(sql)) parameter.get(KEY_FLINK_SQL()) else
parameter.get(sql)
+ require(StringUtils.isNotBlank(flinkSql), "verify failed: flink sql cannot
be empty")
def callback(r: String): Unit = {
callbackFunc match {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 36cc11d02..26c31f928 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -23,10 +23,8 @@ import org.apache.streampark.flink.core.SqlCommand._
import org.apache.calcite.config.Lex
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.parser.SqlParser.Config
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.sql.parser.validate.FlinkSqlConformance
-import org.apache.flink.table.api.{SqlDialect, TableConfig}
+import org.apache.flink.table.api.SqlDialect
import org.apache.flink.table.api.SqlDialect.{DEFAULT, HIVE}
import org.apache.flink.table.api.config.TableConfigOptions
import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
@@ -45,13 +43,6 @@ object FlinkSqlValidator extends Logger {
private[this] lazy val sqlParserConfigMap: Map[String, SqlParser.Config] = {
def getConfig(sqlDialect: SqlDialect): Config = {
- val tableConfig = new TableConfig()
- tableConfig.getConfiguration.set(
- ExecutionOptions.RUNTIME_MODE,
- RuntimeExecutionMode.STREAMING)
- tableConfig.getConfiguration.set(
- TableConfigOptions.TABLE_SQL_DIALECT,
- sqlDialect.name().toLowerCase())
val conformance = sqlDialect match {
case HIVE => FlinkSqlConformance.HIVE
case DEFAULT => FlinkSqlConformance.DEFAULT
@@ -71,7 +62,7 @@ object FlinkSqlValidator extends Logger {
def verifySql(sql: String): FlinkSqlValidationResult = {
val sqlCommands = SqlCommandParser.parseSQL(sql, r => return r)
- var sqlDialect = "default"
+ var sqlDialect = SqlDialect.DEFAULT.name().toLowerCase()
var hasInsert = false
for (call <- sqlCommands) {
val args = call.operands.head
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 492e6e9cf..0d21aab5c 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -21,6 +21,7 @@ import
org.apache.streampark.common.enums.FlinkSqlValidationFailedType
import org.apache.streampark.common.util.Logger
import enumeratum.EnumEntry
+import org.apache.commons.lang3.StringUtils
import java.lang.{Boolean => JavaBool}
import java.util.Scanner
@@ -37,7 +38,7 @@ object SqlCommandParser extends Logger {
sql: String,
validationCallback: FlinkSqlValidationResult => Unit = null):
List[SqlCommandCall] = {
val sqlEmptyError = "verify failed: flink sql cannot be empty."
- require(sql != null && sql.trim.nonEmpty, sqlEmptyError)
+ require(StringUtils.isNotBlank(sql), sqlEmptyError)
val sqlSegments = SqlSplitter.splitSql(sql)
sqlSegments match {
case s if s.isEmpty =>
@@ -121,7 +122,7 @@ sealed abstract class SqlCommand(
var matcher: Matcher = _
def matches(input: String): Boolean = {
- if (regex == null) false
+ if (StringUtils.isBlank(regex)) false
else {
val pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE |
Pattern.DOTALL)
matcher = pattern.matcher(input)
@@ -433,7 +434,7 @@ object SqlSplitter {
*/
def splitSql(sql: String): List[SqlSegment] = {
val queries = ListBuffer[String]()
- val lastIndex = if (sql != null && sql.nonEmpty) sql.length - 1 else 0
+ val lastIndex = if (StringUtils.isNotBlank(sql)) sql.length - 1 else 0
var query = new mutable.StringBuilder
var multiLineComment = false
@@ -454,7 +455,7 @@ object SqlSplitter {
while (scanner.hasNextLine) {
lineNumber += 1
val line = scanner.nextLine().trim
- val nonEmpty = line.nonEmpty && !line.startsWith(PARAM_PREFIX)
+ val nonEmpty = StringUtils.isNotBlank(line) &&
!line.startsWith(PARAM_PREFIX)
if (line.startsWith("/*")) {
startComment = true
hasComment = true
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 76239c6cb..3b094d086 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.{DeflaterUtils,
PropertiesUtils}
import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.ExecutionOptions
@@ -34,7 +36,7 @@ object SqlClient extends App {
private[this] val flinkSql = {
val sql = parameterTool.get(KEY_FLINK_SQL())
- require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be
null")
+ require(StringUtils.isNotBlank(sql), "Usage: flink sql cannot be null")
Try(DeflaterUtils.unzipString(sql)) match {
case Success(value) => value
case Failure(_) =>
@@ -44,7 +46,7 @@ object SqlClient extends App {
private[this] val sets =
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
- private[this] val defaultMode = "streaming"
+ private[this] val defaultMode =
RuntimeExecutionMode.STREAMING.name().toLowerCase()
private[this] val mode = sets.find(_.operands.head ==
ExecutionOptions.RUNTIME_MODE.key()) match {
case Some(e) =>