This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c97f4fcd4 [Bug] support flinksql set syntax (#1926)
c97f4fcd4 is described below
commit c97f4fcd48fd93a0f61325e0927ffcda9df1ebfb
Author: benjobs <[email protected]>
AuthorDate: Sun Oct 30 19:23:19 2022 +0800
[Bug] support flinksql set syntax (#1926)
---
.../streampark/flink/core/FlinkSqlExecutor.scala | 45 ++--------------------
.../streampark/flink/core/FlinkSqlValidator.scala | 10 -----
2 files changed, 3 insertions(+), 52 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index b810ea636..143487b45 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -17,56 +17,21 @@
package org.apache.streampark.flink.core
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{ConfigOption, Configuration,
ExecutionOptions}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.config.{ExecutionConfigOptions,
OptimizerConfigOptions, TableConfigOptions}
import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.core.SqlCommand._
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
-import java.util.{HashMap => JavaHashMap, Map => JavaMap}
import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
object FlinkSqlExecutor extends Logger {
private[this] val lock = new ReentrantReadWriteLock().writeLock
- /**
- * all the available sql config options.
- * see:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config
- */
- lazy val tableConfigOptions: JavaMap[String, ConfigOption[_]] = {
- def extractConfig(clazz: Class[_]): JavaMap[String, ConfigOption[_]] = {
- val configOptions = new JavaHashMap[String, ConfigOption[_]]
- clazz.getDeclaredFields.foreach(field => {
- if (field.getType.isAssignableFrom(classOf[ConfigOption[_]])) {
- Try {
- val configOption =
field.get(classOf[ConfigOption[_]]).asInstanceOf[ConfigOption[_]]
- configOptions.put(configOption.key, configOption)
- } match {
- case Success(_) =>
- case Failure(e) => logError("Fail to get ConfigOption", e)
- }
- }
- })
- configOptions
- }
-
- val configOptions = new JavaHashMap[String, ConfigOption[_]]
- val configList = List(
- //classOf[PythonOptions],
- classOf[ExecutionOptions],
- classOf[ExecutionConfigOptions],
- classOf[OptimizerConfigOptions],
- classOf[TableConfigOptions]
- )
- configList.foreach(x => configOptions.putAll(extractConfig(x)))
- configOptions
- }
-
private[streampark] def executeSql(sql: String, parameter: ParameterTool,
context: TableEnvironment)(implicit callbackFunc: String => Unit = null): Unit
= {
val flinkSql: String = if (sql == null || sql.isEmpty)
parameter.get(KEY_FLINK_SQL()) else parameter.get(sql)
require(flinkSql != null && flinkSql.trim.nonEmpty, "verify failed: flink
sql cannot be empty")
@@ -118,15 +83,11 @@ object FlinkSqlExecutor extends Logger {
val tableResult = context.executeSql(x.originSql)
val r = tableResult.collect().next().getField(0).toString
callback(r)
-
// For specific statement, such as: SET/RESET/INSERT/SELECT
case SET =>
- if (!tableConfigOptions.containsKey(args)) {
- throw new IllegalArgumentException(s"$args is not a valid
table/sql config, please check link:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config")
- }
val operand = x.operands(1)
- context.getConfig.getConfiguration.setString(args, operand)
logInfo(s"$command: $args --> $operand")
+ context.getConfig.getConfiguration.setString(args, operand)
case RESET | RESET_ALL =>
val confDataField =
classOf[Configuration].getDeclaredField("confData")
confDataField.setAccessible(true)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index ac39c3e1d..760586358 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -72,16 +72,6 @@ object FlinkSqlValidator extends Logger {
lazy val command = call.command
command match {
case SET | RESET =>
- // if (!FlinkSqlExecutor.tableConfigOptions.containsKey(args)) {
- // return FlinkSqlValidationResult(
- // success = false,
- // failedType = FlinkSqlValidationFailedType.VERIFY_FAILED,
- // lineStart = call.lineStart,
- // lineEnd = call.lineEnd,
- // sql = sql.replaceFirst(";|$", ";"),
- // exception = s"$args is not a valid table/sql config"
- // )
- // }
if (command == SET && args ==
TableConfigOptions.TABLE_SQL_DIALECT.key()) {
sqlDialect = call.operands.last
}