This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d11fafe2f [bug] [fix] [streampark-flink-shims-base] Repair multiple
dialect set statement have no effect error. (#1826)
d11fafe2f is described below
commit d11fafe2f804a6f96d46c54c1b4cd91bd47949c7
Author: 第一片心意 <[email protected]>
AuthorDate: Tue Oct 25 08:51:17 2022 +0800
[bug] [fix] [streampark-flink-shims-base] Repair multiple dialect set
statement have no effect error. (#1826)
* Fix the problem that resources of the flink job manager role are not
automatically released after the flink sql batch task is successfully run in
yarn per job mode.
* Fixed an issue where hive sql could not be run due to the platform
changing the flink sql order, which could not load hive dialect correctly.
* Remove the validity check for keys in the set statement, because the
platform is not yet able to aggregate all keys
---
.../streampark/flink/core/FlinkSqlExecutor.scala | 18 ++++++-------
.../streampark/flink/core/FlinkSqlValidator.scala | 30 ++++++++++++----------
.../streampark/flink/core/SqlCommandParser.scala | 2 +-
.../flink/submit/impl/YarnPerJobSubmit.scala | 2 +-
4 files changed, 27 insertions(+), 25 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index d65d5c46b..b810ea636 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -16,19 +16,18 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
-import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.core.SqlCommand._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.{ConfigOption, Configuration,
ExecutionOptions}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.config.{ExecutionConfigOptions,
OptimizerConfigOptions, TableConfigOptions}
+import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.flink.core.SqlCommand._
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.{HashMap => JavaHashMap, Map => JavaMap}
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
object FlinkSqlExecutor extends Logger {
@@ -79,7 +78,8 @@ object FlinkSqlExecutor extends Logger {
}
}
- val insertArray = new ArrayBuffer[String]()
+ var hasInsert = false
+ val statementSet = context.createStatementSet()
SqlCommandParser.parseSQL(flinkSql).foreach(x => {
val args = if (x.operands.isEmpty) null else x.operands.head
val command = x.command.name
@@ -141,7 +141,9 @@ object FlinkSqlExecutor extends Logger {
logInfo(s"$command: $args")
case BEGIN_STATEMENT_SET | END_STATEMENT_SET =>
logWarn(s"SQL Client Syntax: ${x.command.name} ")
- case INSERT => insertArray += x.originSql
+ case INSERT =>
+ statementSet.addInsertSql(x.originSql)
+ hasInsert = true
case SELECT =>
logError("StreamPark dose not support 'SELECT' statement now!")
throw new RuntimeException("StreamPark dose not support 'select'
statement now!")
@@ -157,9 +159,7 @@ object FlinkSqlExecutor extends Logger {
}
})
- if (insertArray.nonEmpty) {
- val statementSet = context.createStatementSet()
- insertArray.foreach(statementSet.addInsertSql)
+ if (hasInsert) {
statementSet.execute() match {
case t if t != null =>
Try(t.getJobClient.get.getJobID).getOrElse(null) match {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index cfd92eb14..ac39c3e1d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -16,9 +16,6 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
-import org.apache.streampark.common.util.{ExceptionUtils, Logger}
-import org.apache.streampark.flink.core.SqlCommand._
import org.apache.calcite.config.Lex
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.parser.SqlParser.Config
@@ -26,9 +23,12 @@ import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.sql.parser.validate.FlinkSqlConformance
import org.apache.flink.table.api.SqlDialect.{DEFAULT, HIVE}
-import org.apache.flink.table.api.{SqlDialect, TableConfig}
import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.api.{SqlDialect, TableConfig}
import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories
+import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
+import org.apache.streampark.common.util.{ExceptionUtils, Logger}
+import org.apache.streampark.flink.core.SqlCommand._
import scala.util.{Failure, Try}
@@ -72,16 +72,16 @@ object FlinkSqlValidator extends Logger {
lazy val command = call.command
command match {
case SET | RESET =>
- if (!FlinkSqlExecutor.tableConfigOptions.containsKey(args)) {
- return FlinkSqlValidationResult(
- success = false,
- failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
- lineStart = call.lineStart,
- lineEnd = call.lineEnd,
- sql = sql.replaceFirst(";|$", ";"),
- exception = s"$args is not a valid table/sql config"
- )
- }
+ // if (!FlinkSqlExecutor.tableConfigOptions.containsKey(args)) {
+ // return FlinkSqlValidationResult(
+ // success = false,
+ // failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
+ // lineStart = call.lineStart,
+ // lineEnd = call.lineEnd,
+ // sql = sql.replaceFirst(";|$", ";"),
+ // exception = s"$args is not a valid table/sql config"
+ // )
+ // }
if (command == SET && args ==
TableConfigOptions.TABLE_SQL_DIALECT.key()) {
sqlDialect = call.operands.last
}
@@ -98,10 +98,12 @@ object FlinkSqlValidator extends Logger {
case _ =>
throw new UnsupportedOperationException(s"unsupported dialect:
${sqlDialect}")
}
+
val parser = calciteClass.getConstructor(Array(classOf[Config]):
_*).newInstance(sqlParserConfigMap(sqlDialect.toUpperCase()))
val method = parser.getClass.getDeclaredMethod("parse",
classOf[String])
method.setAccessible(true)
method.invoke(parser, call.originSql)
+
} match {
case Failure(e) =>
val exception = ExceptionUtils.stringifyException(e)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 7274e8823..7df3c8acf 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -84,7 +84,7 @@ object SqlCommandParser extends Logger {
} else {
throw new UnsupportedOperationException("flink sql syntax error,
no executable sql")
}
- case r => r.sortWith((a, b) => a.lineStart < b.lineStart)
+ case r => r
}
}
}
diff --git
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
index 664954030..e72e4a8e0 100644
---
a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
+++
b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
@@ -96,7 +96,7 @@ object YarnPerJobSubmit extends YarnSubmitTrait {
submitRequest.effectiveAppName,
classOf[YarnJobClusterEntrypoint].getName,
jobGraph,
- false
+ true
).getClusterClient
}